提交 bf896026 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -104,7 +104,7 @@ Each row contains the device ID, time stamp, collected metrics (current, voltage
## Metric
Metric refers to the physical quantity collected by sensors, equipment or other types of data collection devices, such as current, voltage, temperature, pressure, GPS position, etc., which change with time, and the data type can be integer, float, Boolean, or strings. As time goes by, the amount of collected metric data stored increases. In the meters example, current, voltage and phase are the metrics.
Metric refers to the physical quantity collected by sensors, equipment or other types of data collection devices, such as current, voltage, temperature, pressure, GPS position, etc., which change with time, and the data type can be integer, float, Boolean, or strings. As time goes by, the amount of collected metric data stored increases. In the smart meters example, current, voltage and phase are the metrics.
## Label/Tag
......@@ -112,7 +112,7 @@ Label/Tag refers to the static properties of sensors, equipment or other types o
## Data Collection Point
Data Collection Point (DCP) refers to hardware or software that collects metrics based on preset time periods or triggered by events. A data collection point can collect one or multiple metrics, but these metrics are collected at the same time and have the same time stamp. For some complex equipment, there are often multiple data collection points, and the sampling rate of each collection point may be different, and fully independent. For example, for a car, there could be a data collection point to collect GPS position metrics, a data collection point to collect engine status metrics, and a data collection point to collect the environment metrics inside the car. So in this example the car would have three data collection points. In the meters example, d1001, d1002, d1003, and d1004 are the data collection points.
Data Collection Point (DCP) refers to hardware or software that collects metrics based on preset time periods or triggered by events. A data collection point can collect one or multiple metrics, but these metrics are collected at the same time and have the same time stamp. For some complex equipment, there are often multiple data collection points, and the sampling rate of each collection point may be different, and fully independent. For example, for a car, there could be a data collection point to collect GPS position metrics, a data collection point to collect engine status metrics, and a data collection point to collect the environment metrics inside the car. So in this example the car would have three data collection points. In the smart meters example, d1001, d1002, d1003, and d1004 are the data collection points.
## Table
......@@ -137,7 +137,7 @@ The design of one table for one data collection point will require a huge number
STable is a template for a type of data collection point. A STable contains a set of data collection points (tables) that have the same schema or data structure, but with different static attributes (tags). To describe a STable, in addition to defining the table structure of the metrics, it is also necessary to define the schema of its tags. The data type of tags can be int, float, string, and there can be multiple tags, which can be added, deleted, or modified afterward. If the whole system has N different types of data collection points, N STables need to be established.
In the design of TDengine, **a table is used to represent a specific data collection point, and STable is used to represent a set of data collection points of the same type**. In the meters example, we can create a super table named `meters`.
In the design of TDengine, **a table is used to represent a specific data collection point, and STable is used to represent a set of data collection points of the same type**. In the smart meters example, we can create a super table named `meters`.
## Subtable
......@@ -156,9 +156,9 @@ The relationship between a STable and the subtables created based on this STable
Queries can be executed on both a table (subtable) and a STable. For a query on a STable, TDengine will treat the data in all its subtables as a whole data set for processing. TDengine will first find the subtables that meet the tag filter conditions, then scan the time-series data of these subtables to perform aggregation operation, which reduces the number of data sets to be scanned which in turn greatly improves the performance of data aggregation across multiple DCPs. In essence, querying a supertable is a very efficient aggregate query on multiple DCPs of the same type.
In TDengine, it is recommended to use a subtable instead of a regular table for a DCP. In the meters example, we can create subtables like d1001, d1002, d1003, and d1004 under super table meters.
In TDengine, it is recommended to use a subtable instead of a regular table for a DCP. In the smart meters example, we can create subtables like d1001, d1002, d1003, and d1004 under super table meters.
To better understand the data model using super table and subtable, please refer to the diagram below which demonstrates the data model of meters example. ![Meters Data Model Diagram](./supertable.webp)
To better understand the data model using metri, tags, super table and subtable, please refer to the diagram below which demonstrates the data model of the smart meters example. ![Meters Data Model Diagram](./supertable.webp)
## Database
......
......@@ -11,7 +11,7 @@ When using TDengine to store and query data, the most important part of the data
- The format must be `YYYY-MM-DD HH:mm:ss.MS`, the default time precision is millisecond (ms), for example `2017-08-12 18:25:58.128`
- Internal function `now` can be used to get the current timestamp on the client side
- The current timestamp of the client side is applied when `now` is used to insert data
- Epoch Time:timestamp can also be a long integer number, which means the number of seconds, milliseconds or nanoseconds, depending on the time precision, from 1970-01-01 00:00:00.000 (UTC/GMT)
- Epoch Time:timestamp can also be a long integer number, which means the number of seconds, milliseconds or nanoseconds, depending on the time precision, from UTC 1970-01-01 00:00:00.
- Add/subtract operations can be carried out on timestamps. For example `now-2h` means 2 hours prior to the time at which query is executed. The units of time in operations can be b(nanosecond), u(microsecond), a(millisecond), s(second), m(minute), h(hour), d(day), or w(week). So `select * from t1 where ts > now-2w and ts <= now-1w` means the data between two weeks ago and one week ago. The time unit can also be n (calendar month) or y (calendar year) when specifying the time window for down sampling operations.
Time precision in TDengine can be set by the `PRECISION` parameter when executing `CREATE DATABASE`. The default time precision is millisecond. In the statement below, the precision is set to nanonseconds.
......
......@@ -442,6 +442,26 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -473,6 +493,14 @@ int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct {
STableMetaRsp* pMeta;
} SMCreateStbRsp;
int32_t tEncodeSMCreateStbRsp(SEncoder* pEncoder, const SMCreateStbRsp* pRsp);
int32_t tDecodeSMCreateStbRsp(SDecoder* pDecoder, SMCreateStbRsp* pRsp);
void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
......@@ -1241,24 +1269,6 @@ typedef struct {
SVgroupInfo vgroups[];
} SVgroupsInfo;
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
......@@ -1269,7 +1279,7 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(void* pRsp);
void tFreeSTableIndexRsp(void* info);
typedef struct {
......@@ -2031,11 +2041,13 @@ int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
int32_t code;
STableMetaRsp* pMeta;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
void tFreeSVCreateTbRsp(void* param);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
......
......@@ -215,6 +215,7 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char* jobTaskStatusStr(int32_t status);
......
......@@ -210,6 +210,8 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
*/
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
int64_t taosHashGetCompTimes(SHashObj *pHashObj);
#ifdef __cplusplus
}
#endif
......
......@@ -369,8 +369,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData*
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest);
#ifdef __cplusplus
......
......@@ -782,6 +782,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
......@@ -804,6 +808,19 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_CREATE_TABLE: {
SArray* pList = (SArray*)pRes->res;
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
void* res = taosArrayGetP(pList, i);
code = handleCreateTbExecRes(res, pCatalog);
}
break;
}
case TDMT_MND_CREATE_STB: {
code = handleCreateTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_SUBMIT: {
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
......@@ -863,17 +880,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
return;
}
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......@@ -934,6 +947,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery);
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
......@@ -1132,10 +1149,6 @@ SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool valida
inRetry = true;
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
return pRequest;
}
......
......@@ -233,13 +233,36 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
SMCreateStbRsp createRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp);
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
pRequest->body.resInfo.execRes.res = createRsp.pMeta;
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
removeMeta(pRequest->pTscObj, pRequest->tableList);
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (pRes->res != NULL) {
ret = handleCreateTbExecRes(pRes->res, pCatalog);
}
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
}
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
......
......@@ -3196,12 +3196,16 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
}
} else {
pRsp->pSchemas = NULL;
}
return 0;
......@@ -3326,7 +3330,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
return 0;
}
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp*)pRsp)->pSchemas); }
void tFreeSTableIndexRsp(void *info) {
if (NULL == info) {
......@@ -5092,6 +5096,10 @@ int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->code) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->pMeta ? 1 : 0) < 0) return -1;
if (pRsp->pMeta) {
if (tEncodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
......@@ -5102,10 +5110,32 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) {
if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1;
int32_t meta = 0;
if (tDecodeI32(pCoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
} else {
pRsp->pMeta = NULL;
}
tEndDecode(pCoder);
return 0;
}
void tFreeSVCreateTbRsp(void* param) {
if (NULL == param) {
return;
}
SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param;
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
// TDMT_VND_DROP_TABLE =================
static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
......@@ -5560,6 +5590,60 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
}
}
int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1;
if (pRsp->pMeta->pSchemas) {
if (tEncodeSTableMetaRsp(pEncoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pDecoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tDeserializeSMCreateStbRsp(void *buf, int32_t bufLen, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(&decoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
if (NULL == pRsp) {
return;
}
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
......
......@@ -35,6 +35,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndFreeStb(SStbObj *pStb);
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
......
......@@ -1774,6 +1774,67 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return 0;
}
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) {
int32_t ret = -1;
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (NULL == pDb) {
return -1;
}
SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
if (NULL == pObj) {
goto _OVER;
}
SEncoder ec = {0};
uint32_t contLen = 0;
SMCreateStbRsp stbRsp = {0};
SName name = {0};
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == stbRsp.pMeta) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, ret);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
void *cont = taosMemoryMalloc(contLen);
tEncoderInit(&ec, cont, contLen);
tEncodeSMCreateStbRsp(&ec, &stbRsp);
tEncoderClear(&ec);
tFreeSMCreateStbRsp(&stbRsp);
*pCont = cont;
*pLen = contLen;
ret = 0;
_OVER:
if (pObj) {
mndReleaseStb(pMnode, pObj);
}
if (pDb) {
mndReleaseDb(pMnode, pDb);
}
return ret;
}
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen) {
int32_t code = -1;
......
......@@ -17,6 +17,7 @@
#include "mndTrans.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSync.h"
......@@ -900,15 +901,6 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
......@@ -924,6 +916,21 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
mndReleaseDb(pMnode, pDb);
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL;
int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
}
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
tmsgSendRsp(&rspMsg);
......
......@@ -102,7 +102,7 @@ int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
......
......@@ -367,7 +367,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
return 0;
}
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0};
SMetaReader mr = {0};
......@@ -427,6 +427,21 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (*pMetaRsp) {
if (me.type == TSDB_CHILD_TABLE) {
(*pMetaRsp)->tableType = TSDB_CHILD_TABLE;
(*pMetaRsp)->tuid = pReq->uid;
(*pMetaRsp)->suid = pReq->ctb.suid;
strcpy((*pMetaRsp)->tbName, pReq->name);
} else {
metaUpdateMetaRsp(pReq->uid, pReq->name, &pReq->ntb.schemaRow, *pMetaRsp);
}
}
}
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
pReq->type);
return 0;
......
......@@ -368,6 +368,10 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
}
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
if (NULL == pMetaRsp) {
return;
}
strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
pMetaRsp->dbId = pVnode->config.dbId;
pMetaRsp->vgId = TD_VID(pVnode);
......@@ -512,7 +516,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
// do create table
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
......@@ -522,6 +526,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
}
taosArrayPush(rsp.pArray, &cRsp);
......@@ -550,7 +555,7 @@ _exit:
pCreateReq = req.pReqs + iReq;
taosArrayDestroy(pCreateReq->ctb.tagName);
}
taosArrayDestroy(rsp.pArray);
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
......@@ -862,7 +867,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
pRsp->code = terrno;
......
......@@ -270,13 +270,22 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
int32_t code = 0;
strcpy(output->dbFName, rspMsg->dbFName);
strcpy(output->tbName, rspMsg->tbName);
output->dbId = rspMsg->dbId;
SET_META_TYPE_TABLE(output->metaType);
if (TSDB_CHILD_TABLE == rspMsg->tableType && NULL == rspMsg->pSchemas) {
strcpy(output->ctbName, rspMsg->tbName);
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
SET_META_TYPE_CTABLE(output->metaType);
CTG_ERR_JRET(queryCreateCTableMetaFromMsg(rspMsg, &output->ctbMeta));
} else {
strcpy(output->tbName, rspMsg->tbName);
SET_META_TYPE_TABLE(output->metaType);
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
}
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp));
......
......@@ -135,7 +135,12 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
int32_t tlen = strlen(t);
if (tlen > 32) {
*len += snprintf(buf + *len, bufSize - *len, "%.*s...%s", 32, t, t + tlen - 1);
} else {
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
}
taosMemoryFree(t);
return TSDB_CODE_SUCCESS;
......@@ -199,12 +204,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SNodeListNode *pListNode = (SNodeListNode *)pNode;
SNode *node = NULL;
bool first = true;
int32_t num = 0;
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pListNode->pNodeList) {
if (!first) {
*len += snprintf(buf + *len, bufSize - *len, ", ");
if (++num >= 10) {
*len += snprintf(buf + *len, bufSize - *len, "...");
break;
}
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
......
......@@ -213,15 +213,25 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
return s;
}
void freeSTableMetaRspPointer(void *p) {
tFreeSTableMetaRsp(*(void**)p);
taosMemoryFreeClear(*(void**)p);
}
void destroyQueryExecRes(SExecResult* pRes) {
if (NULL == pRes || NULL == pRes->res) {
return;
}
switch (pRes->msgType) {
case TDMT_VND_CREATE_TABLE: {
taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
break;
}
case TDMT_MND_CREATE_STB:
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp*)pRes->res);
tFreeSTableMetaRsp(pRes->res);
taosMemoryFreeClear(pRes->res);
break;
}
......
......@@ -354,6 +354,19 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) {
pMeta->vgId = msg->vgId;
pMeta->tableType = msg->tableType;
pMeta->uid = msg->tuid;
pMeta->suid = msg->suid;
qDebug("ctable %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s suid %" PRIx64 ,
msg->tbName, pMeta->uid, pMeta->tableType, pMeta->vgId, msg->dbFName, pMeta->suid);
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
......
......@@ -102,15 +102,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
}
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (rsp->pMeta) {
taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta);
}
if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {
taosArrayDestroy((SArray*)pJob->execRes.res);
pJob->execRes.res = NULL;
}
}
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
......
......@@ -21,7 +21,7 @@
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
#define MAX_WARNING_REF_COUNT 10000
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_MAX_CAPACITY (1024 * 1024 * 1024)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
......@@ -67,6 +67,7 @@ struct SHashObj {
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
int64_t compTimes;
};
/*
......@@ -146,6 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
uint32_t hashVal) {
SHashNode *pNode = pe->next;
while (pNode) {
atomic_add_fetch_64(&pHashObj->compTimes, 1);
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
......@@ -882,3 +884,7 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
void taosHashRelease(SHashObj *pHashObj, void *p) { taosHashCancelIterate(pHashObj, p); }
int64_t taosHashGetCompTimes(SHashObj *pHashObj) { return atomic_load_64(&pHashObj->compTimes); }
......@@ -197,6 +197,201 @@ void acquireRleaseTest() {
taosMemoryFreeClear(data.p);
}
void perfTest() {
SHashObj* hash1h = (SHashObj*) taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash1s = (SHashObj*) taosHashInit(1000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash10s = (SHashObj*) taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash100s = (SHashObj*) taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash1m = (SHashObj*) taosHashInit(1000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash10m = (SHashObj*) taosHashInit(10000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash100m = (SHashObj*) taosHashInit(100000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
char *name = (char*)taosMemoryCalloc(50000000, 9);
for (int64_t i = 0; i < 50000000; ++i) {
sprintf(name + i * 9, "t%08d", i);
}
for (int64_t i = 0; i < 50; ++i) {
taosHashPut(hash1h, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 500; ++i) {
taosHashPut(hash1s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 5000; ++i) {
taosHashPut(hash10s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 50000; ++i) {
taosHashPut(hash100s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 500000; ++i) {
taosHashPut(hash1m, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 5000000; ++i) {
taosHashPut(hash10m, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 50000000; ++i) {
taosHashPut(hash100m, name + i * 9, 9, &i, sizeof(i));
}
int64_t start1h = taosGetTimestampMs();
int64_t start1hCt = taosHashGetCompTimes(hash1h);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1h, name + (i % 50) * 9, 9));
}
int64_t end1h = taosGetTimestampMs();
int64_t end1hCt = taosHashGetCompTimes(hash1h);
int64_t start1s = taosGetTimestampMs();
int64_t start1sCt = taosHashGetCompTimes(hash1s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1s, name + (i % 500) * 9, 9));
}
int64_t end1s = taosGetTimestampMs();
int64_t end1sCt = taosHashGetCompTimes(hash1s);
int64_t start10s = taosGetTimestampMs();
int64_t start10sCt = taosHashGetCompTimes(hash10s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash10s, name + (i % 5000) * 9, 9));
}
int64_t end10s = taosGetTimestampMs();
int64_t end10sCt = taosHashGetCompTimes(hash10s);
int64_t start100s = taosGetTimestampMs();
int64_t start100sCt = taosHashGetCompTimes(hash100s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash100s, name + (i % 50000) * 9, 9));
}
int64_t end100s = taosGetTimestampMs();
int64_t end100sCt = taosHashGetCompTimes(hash100s);
int64_t start1m = taosGetTimestampMs();
int64_t start1mCt = taosHashGetCompTimes(hash1m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1m, name + (i % 500000) * 9, 9));
}
int64_t end1m = taosGetTimestampMs();
int64_t end1mCt = taosHashGetCompTimes(hash1m);
int64_t start10m = taosGetTimestampMs();
int64_t start10mCt = taosHashGetCompTimes(hash10m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash10m, name + (i % 5000000) * 9, 9));
}
int64_t end10m = taosGetTimestampMs();
int64_t end10mCt = taosHashGetCompTimes(hash10m);
int64_t start100m = taosGetTimestampMs();
int64_t start100mCt = taosHashGetCompTimes(hash100m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash100m, name + (i % 50000000) * 9, 9));
}
int64_t end100m = taosGetTimestampMs();
int64_t end100mCt = taosHashGetCompTimes(hash100m);
SArray *sArray[1000] = {0};
for (int64_t i = 0; i < 1000; ++i) {
sArray[i] = taosArrayInit(100000, 9);
}
int64_t cap = 4;
while (cap < 100000000) cap = (cap << 1u);
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t slotR = cap / 1000 + 1;
for (int64_t i = 0; i < 10000000; ++i) {
char* p = name + (i % 50000000) * 9;
uint32_t v = (*hashFp)(p, 9);
taosArrayPush(sArray[(v%cap)/slotR], p);
}
SArray *slArray = taosArrayInit(100000000, 9);
for (int64_t i = 0; i < 1000; ++i) {
int32_t num = taosArrayGetSize(sArray[i]);
SArray* pArray = sArray[i];
for (int64_t m = 0; m < num; ++m) {
char* p = (char*)taosArrayGet(pArray, m);
ASSERT(taosArrayPush(slArray, p));
}
}
int64_t start100mS = taosGetTimestampMs();
int64_t start100mSCt = taosHashGetCompTimes(hash100m);
int32_t num = taosArrayGetSize(slArray);
for (int64_t i = 0; i < num; ++i) {
ASSERT(taosHashGet(hash100m, (char*)TARRAY_GET_ELEM(slArray, i), 9));
}
int64_t end100mS = taosGetTimestampMs();
int64_t end100mSCt = taosHashGetCompTimes(hash100m);
for (int64_t i = 0; i < 1000; ++i) {
taosArrayDestroy(sArray[i]);
}
taosArrayDestroy(slArray);
printf("1h \t %" PRId64 "ms,%" PRId64 "\n", end1h - start1h, end1hCt - start1hCt);
printf("1s \t %" PRId64 "ms,%" PRId64 "\n", end1s - start1s, end1sCt - start1sCt);
printf("10s \t %" PRId64 "ms,%" PRId64 "\n", end10s - start10s, end10sCt - start10sCt);
printf("100s \t %" PRId64 "ms,%" PRId64 "\n", end100s - start100s, end100sCt - start100sCt);
printf("1m \t %" PRId64 "ms,%" PRId64 "\n", end1m - start1m, end1mCt - start1mCt);
printf("10m \t %" PRId64 "ms,%" PRId64 "\n", end10m - start10m, end10mCt - start10mCt);
printf("100m \t %" PRId64 "ms,%" PRId64 "\n", end100m - start100m, end100mCt - start100mCt);
printf("100mS \t %" PRId64 "ms,%" PRId64 "\n", end100mS - start100mS, end100mSCt - start100mSCt);
taosHashCleanup(hash1h);
taosHashCleanup(hash1s);
taosHashCleanup(hash10s);
taosHashCleanup(hash100s);
taosHashCleanup(hash1m);
taosHashCleanup(hash10m);
taosHashCleanup(hash100m);
SHashObj *mhash[1000] = {0};
for (int64_t i = 0; i < 1000; ++i) {
mhash[i] = (SHashObj*) taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
for (int64_t i = 0; i < 50000000; ++i) {
#if 0
taosHashPut(mhash[i%1000], name + i * 9, 9, &i, sizeof(i));
#else
taosHashPut(mhash[i/50000], name + i * 9, 9, &i, sizeof(i));
#endif
}
int64_t startMhashCt = 0;
for (int64_t i = 0; i < 1000; ++i) {
startMhashCt += taosHashGetCompTimes(mhash[i]);
}
int64_t startMhash = taosGetTimestampMs();
#if 0
for (int32_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(mhash[i%1000], name + i * 9, 9));
}
#else
// for (int64_t i = 0; i < 10000000; ++i) {
for (int64_t i = 0; i < 50000000; i+=5) {
ASSERT(taosHashGet(mhash[i/50000], name + i * 9, 9));
}
#endif
int64_t endMhash = taosGetTimestampMs();
int64_t endMhashCt = 0;
for (int64_t i = 0; i < 1000; ++i) {
printf(" %" PRId64 , taosHashGetCompTimes(mhash[i]));
endMhashCt += taosHashGetCompTimes(mhash[i]);
}
printf("\n100m \t %" PRId64 "ms,%" PRId64 "\n", endMhash - startMhash, endMhashCt - startMhashCt);
for (int64_t i = 0; i < 1000; ++i) {
taosHashCleanup(mhash[i]);
}
}
}
int main(int argc, char** argv) {
......@@ -210,4 +405,5 @@ TEST(testCase, hashTest) {
noLockPerformanceTest();
multithreadsTest();
acquireRleaseTest();
//perfTest();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册