未验证 提交 094962e7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9473 from taosdata/feature/vnode

make create table OK
...@@ -28,5 +28,5 @@ ...@@ -28,5 +28,5 @@
// "postCreateCommand": "gcc -v", // "postCreateCommand": "gcc -v",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode" "remoteUser": "root"
} }
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
#define tPutB(buf, val) \ #define tPutB(buf, val) \
({ \ ({ \
((uint8_t *)buf)[7] = ((val) >> 56) & 0xff; \
((uint8_t *)buf)[6] = ((val) >> 48) & 0xff; \
((uint8_t *)buf)[5] = ((val) >> 40) & 0xff; \
((uint8_t *)buf)[4] = ((val) >> 32) & 0xff; \
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \ ((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \ ((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \ ((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \
...@@ -21,13 +25,33 @@ ...@@ -21,13 +25,33 @@
POINTER_SHIFT(buf, sizeof(val)); \ POINTER_SHIFT(buf, sizeof(val)); \
}) })
#define tPutC(buf, val) \ #define tPutC(buf, val) \
({ \ ({ \
((uint64_t *)buf)[0] = (val); \ if (buf) { \
POINTER_SHIFT(buf, sizeof(val)); \ ((uint64_t *)buf)[0] = (val); \
POINTER_SHIFT(buf, sizeof(val)); \
} \
NULL; \
}) })
typedef enum { A, B, C } T; #define tPutD(buf, val) \
({ \
uint64_t tmp = val; \
for (size_t i = 0; i < sizeof(val); i++) { \
((uint8_t *)buf)[i] = tmp & 0xff; \
tmp >>= 8; \
} \
POINTER_SHIFT(buf, sizeof(val)); \
})
static inline void tPutE(void **buf, uint64_t val) {
if (buf) {
((uint64_t *)(*buf))[0] = val;
*buf = POINTER_SHIFT(*buf, sizeof(val));
}
}
typedef enum { A, B, C, D, E } T;
static void func(T t) { static void func(T t) {
uint64_t val = 198; uint64_t val = 198;
...@@ -59,6 +83,22 @@ static void func(T t) { ...@@ -59,6 +83,22 @@ static void func(T t) {
} }
} }
break; break;
case D:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
pBuf = tPutD(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
}
break;
case E:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
tPutE(&pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
}
break;
default: default:
break; break;
...@@ -83,5 +123,11 @@ int main(int argc, char const *argv[]) { ...@@ -83,5 +123,11 @@ int main(int argc, char const *argv[]) {
func(C); func(C);
uint64_t t4 = now(); uint64_t t4 = now();
printf("C: %ld\n", t4 - t3); printf("C: %ld\n", t4 - t3);
func(D);
uint64_t t5 = now();
printf("D: %ld\n", t5 - t4);
func(E);
uint64_t t6 = now();
printf("E: %ld\n", t6 - t5);
return 0; return 0;
} }
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "encode.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "tcoding.h" #include "tcoding.h"
...@@ -299,12 +300,12 @@ typedef struct SEpSet { ...@@ -299,12 +300,12 @@ typedef struct SEpSet {
} SEpSet; } SEpSet;
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t reserved[5]; int8_t reserved[5];
SEpSet epSet; SEpSet epSet;
} SConnectRsp; } SConnectRsp;
typedef struct { typedef struct {
...@@ -1057,9 +1058,9 @@ typedef struct STaskDropRsp { ...@@ -1057,9 +1058,9 @@ typedef struct STaskDropRsp {
} STaskDropRsp; } STaskDropRsp;
typedef struct { typedef struct {
int8_t igExists; int8_t igExists;
char* name; char* name;
char* phyPlan; char* phyPlan;
} SCMCreateTopicReq; } SCMCreateTopicReq;
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
#include "mallocator.h" #include "mallocator.h"
#include "os.h" #include "os.h"
#include "trow.h"
#include "tmsg.h" #include "tmsg.h"
#include "trow.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -47,35 +47,13 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); ...@@ -47,35 +47,13 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);
// For Query
int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg);
// Options // Options
void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg); void metaOptionsClear(SMetaCfg *pMetaCfg);
// STbCfg
#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_SUPER_TABLE, .stbCfg = { \
.suid = (SUID), \
.pSchema = (PSCHEMA), \
.pTagSchema = (PTAGSCHEMA) \
} \
}
#define META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_CHILD_TABLE, .ctbCfg = {.suid = (SUID), .pTag = PTAG } \
}
#define META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) \
{ \
.name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_NORMAL_TABLE, .ntbCfg = {.pSchema = (PSCHEMA) } \
}
#define META_CLEAR_TB_CFG(pTbCfg)
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg);
void *metaDecodeTbCfg(void *pBuf, STbCfg *pTbCfg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -187,68 +187,6 @@ void vnodeOptionsInit(SVnodeCfg *pOptions); ...@@ -187,68 +187,6 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
*/ */
void vnodeOptionsClear(SVnodeCfg *pOptions); void vnodeOptionsClear(SVnodeCfg *pOptions);
/* ------------------------ REQUESTS ------------------------ */
typedef STbCfg SVCreateTableReq;
typedef struct {
tb_uid_t uid;
} SVDropTableReq;
typedef struct {
// TODO
} SVSubmitReq;
typedef struct {
uint64_t ver;
union {
SVCreateTableReq ctReq;
SVDropTableReq dtReq;
};
} SVnodeReq;
typedef struct {
int err;
char info[];
} SVnodeRsp;
static FORCE_INLINE void vnodeSetCreateStbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep, tb_uid_t suid,
STSchema *pSchema, STSchema *pTagSchema) {
pReq->ver = 0;
pReq->ctReq.name = name;
pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_SUPER_TABLE;
pReq->ctReq.stbCfg.suid = suid;
pReq->ctReq.stbCfg.pSchema = pSchema;
pReq->ctReq.stbCfg.pTagSchema = pTagSchema;
}
static FORCE_INLINE void vnodeSetCreateCtbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep, tb_uid_t suid,
SKVRow pTag) {
pReq->ver = 0;
pReq->ctReq.name = name;
pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_CHILD_TABLE;
pReq->ctReq.ctbCfg.suid = suid;
pReq->ctReq.ctbCfg.pTag = pTag;
}
static FORCE_INLINE void vnodeSetCreateNtbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep,
STSchema *pSchema) {
pReq->ver = 0;
pReq->ctReq.name = name;
pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_NORMAL_TABLE;
pReq->ctReq.ntbCfg.pSchema = pSchema;
}
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type);
void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type);
/* ------------------------ FOR COMPILE ------------------------ */ /* ------------------------ FOR COMPILE ------------------------ */
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
......
...@@ -20,8 +20,11 @@ ...@@ -20,8 +20,11 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { TD_LITTLE_ENDIAN = 0, TD_BIG_ENDIAN } td_endian_t;
static const int32_t endian_test_var = 1; static const int32_t endian_test_var = 1;
#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&endian_test_var) != 0) #define IS_LITTLE_ENDIAN() (*(uint8_t *)(&endian_test_var) != 0)
#define TD_RT_ENDIAN() (IS_LITTLE_ENDIAN() ? TD_LITTLE_ENDIAN : TD_BIG_ENDIAN)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_ENCODE_H_
#define _TD_UTIL_ENCODE_H_
#include "tcoding.h"
#include "tmacro.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
td_endian_t endian;
uint8_t* data;
int64_t size;
int64_t pos;
} SEncoder, SDecoder;
#define tPut(TYPE, BUF, VAL) ((TYPE*)(BUF))[0] = (VAL)
#define tGet(TYPE, BUF, VAL) (VAL) = ((TYPE*)(BUF))[0]
#define tRPut16(PDEST, PSRC) \
((uint8_t*)(PDEST))[0] = ((uint8_t*)(PSRC))[1]; \
((uint8_t*)(PDEST))[1] = ((uint8_t*)(PSRC))[0];
#define tRPut32(PDEST, PSRC) \
((uint8_t*)(PDEST))[0] = ((uint8_t*)(PSRC))[3]; \
((uint8_t*)(PDEST))[1] = ((uint8_t*)(PSRC))[2]; \
((uint8_t*)(PDEST))[2] = ((uint8_t*)(PSRC))[1]; \
((uint8_t*)(PDEST))[3] = ((uint8_t*)(PSRC))[0];
#define tRPut64(PDEST, PSRC) \
((uint8_t*)(PDEST))[0] = ((uint8_t*)(PSRC))[7]; \
((uint8_t*)(PDEST))[1] = ((uint8_t*)(PSRC))[6]; \
((uint8_t*)(PDEST))[2] = ((uint8_t*)(PSRC))[5]; \
((uint8_t*)(PDEST))[3] = ((uint8_t*)(PSRC))[4]; \
((uint8_t*)(PDEST))[4] = ((uint8_t*)(PSRC))[3]; \
((uint8_t*)(PDEST))[5] = ((uint8_t*)(PSRC))[2]; \
((uint8_t*)(PDEST))[6] = ((uint8_t*)(PSRC))[1]; \
((uint8_t*)(PDEST))[7] = ((uint8_t*)(PSRC))[0];
#define tRGet16 tRPut16
#define tRGet32 tRPut32
#define tRGet64 tRPut64
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CHECK_CODER_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
/* ------------------------ FOR ENCODER ------------------------ */
static FORCE_INLINE void tInitEncoder(SEncoder* pEncoder, td_endian_t endian, uint8_t* data, int64_t size) {
pEncoder->endian = endian;
pEncoder->data = data;
pEncoder->size = (data) ? size : 0;
pEncoder->pos = 0;
}
// 8
static FORCE_INLINE int tEncodeU8(SEncoder* pEncoder, uint8_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
tPut(uint8_t, TD_CODER_CURRENT(pEncoder), val);
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
static FORCE_INLINE int tEncodeI8(SEncoder* pEncoder, int8_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
tPut(int8_t, TD_CODER_CURRENT(pEncoder), val);
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
// 16
static FORCE_INLINE int tEncodeU16(SEncoder* pEncoder, uint16_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(uint16_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut16(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
static FORCE_INLINE int tEncodeI16(SEncoder* pEncoder, int16_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(int16_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut16(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
// 32
static FORCE_INLINE int tEncodeU32(SEncoder* pEncoder, uint32_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(uint32_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut32(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
static FORCE_INLINE int tEncodeI32(SEncoder* pEncoder, int32_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(int32_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut32(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
// 64
static FORCE_INLINE int tEncodeU64(SEncoder* pEncoder, uint64_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(uint64_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut64(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
static FORCE_INLINE int tEncodeI64(SEncoder* pEncoder, int64_t val) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, sizeof(val))) return -1;
if (TD_RT_ENDIAN() == pEncoder->endian) {
tPut(int64_t, TD_CODER_CURRENT(pEncoder), val);
} else {
tRPut64(TD_CODER_CURRENT(pEncoder), &val);
}
}
TD_CODER_MOVE_POS(pEncoder, sizeof(val));
return 0;
}
// 16v
static FORCE_INLINE int tEncodeU16v(SEncoder* pEncoder, uint16_t val) {
int64_t i = 0;
while (val >= ENCODE_LIMIT) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (val | ENCODE_LIMIT) & 0xff;
}
val >>= 7;
i++;
}
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (uint8_t)val;
}
TD_CODER_MOVE_POS(pEncoder, i + 1);
return 0;
}
static FORCE_INLINE int tEncodeI16v(SEncoder* pEncoder, int16_t val) {
return tEncodeU16v(pEncoder, ZIGZAGE(int16_t, val));
}
// 32v
static FORCE_INLINE int tEncodeU32v(SEncoder* pEncoder, uint32_t val) {
int64_t i = 0;
while (val >= ENCODE_LIMIT) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (val | ENCODE_LIMIT) & 0xff;
}
val >>= 7;
i++;
}
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (uint8_t)val;
}
TD_CODER_MOVE_POS(pEncoder, i + 1);
return 0;
}
static FORCE_INLINE int tEncodeI32v(SEncoder* pEncoder, int32_t val) {
return tEncodeU32v(pEncoder, ZIGZAGE(int32_t, val));
}
// 64v
static FORCE_INLINE int tEncodeU64v(SEncoder* pEncoder, uint64_t val) {
int64_t i = 0;
while (val >= ENCODE_LIMIT) {
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (val | ENCODE_LIMIT) & 0xff;
}
val >>= 7;
i++;
}
if (pEncoder->data) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pEncoder, 1)) return -1;
TD_CODER_CURRENT(pEncoder)[i] = (uint8_t)val;
}
TD_CODER_MOVE_POS(pEncoder, i + 1);
return 0;
}
static FORCE_INLINE int tEncodeI64v(SEncoder* pEncoder, int64_t val) {
return tEncodeU64v(pEncoder, ZIGZAGE(int64_t, val));
}
static FORCE_INLINE int tEncodeFloat(SEncoder* pEncoder, float val) {
// TODO
return 0;
}
static FORCE_INLINE int tEncodeDouble(SEncoder* pEncoder, double val) {
// TODO
return 0;
}
static FORCE_INLINE int tEncodeCStr(SEncoder* pEncoder, const char* val) {
// TODO
return 0;
}
/* ------------------------ FOR DECODER ------------------------ */
static FORCE_INLINE void tInitDecoder(SDecoder* pDecoder, td_endian_t endian, uint8_t* data, int64_t size) {
ASSERT(!TD_IS_NULL(data));
pDecoder->endian = endian;
pDecoder->data = data;
pDecoder->size = size;
pDecoder->pos = 0;
}
// 8
static FORCE_INLINE int tDecodeU8(SDecoder* pDecoder, uint8_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
tGet(uint8_t, TD_CODER_CURRENT(pDecoder), *val);
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
static FORCE_INLINE int tDecodeI8(SDecoder* pDecoder, int8_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
tGet(int8_t, TD_CODER_CURRENT(pDecoder), *val);
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
// 16
static FORCE_INLINE int tDecodeU16(SDecoder* pDecoder, uint16_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(uint16_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet16(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
static FORCE_INLINE int tDecodeI16(SDecoder* pDecoder, int16_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(int16_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet16(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
// 32
static FORCE_INLINE int tDecodeU32(SDecoder* pDecoder, uint32_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(uint32_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet32(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
static FORCE_INLINE int tDecodeI32(SDecoder* pDecoder, int32_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(int32_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet32(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
// 64
static FORCE_INLINE int tDecodeU64(SDecoder* pDecoder, uint64_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(uint64_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet64(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
static FORCE_INLINE int tDecodeI64(SDecoder* pDecoder, int64_t* val) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, sizeof(*val))) return -1;
if (TD_RT_ENDIAN() == pDecoder->endian) {
tGet(int64_t, TD_CODER_CURRENT(pDecoder), *val);
} else {
tRGet64(val, TD_CODER_CURRENT(pDecoder));
}
TD_CODER_MOVE_POS(pDecoder, sizeof(*val));
return 0;
}
// 16v
static FORCE_INLINE int tDecodeU16v(SDecoder* pDecoder, uint16_t* val) {
int64_t i = 0;
*val = 0;
for (;;) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, 1)) return -1;
uint16_t tval = TD_CODER_CURRENT(pDecoder)[i];
if (tval < ENCODE_LIMIT) {
(*val) |= (tval << (7 * i));
break;
} else {
(*val) |= (((tval) & (ENCODE_LIMIT - 1)) << (7 * i));
i++;
}
}
TD_CODER_MOVE_POS(pDecoder, i);
return 0;
}
static FORCE_INLINE int tDecodeI16v(SDecoder* pDecoder, int16_t* val) {
uint16_t tval;
if (tDecodeU16v(pDecoder, &tval) < 0) {
return -1;
}
*val = ZIGZAGD(int16_t, tval);
return 0;
}
// 32v
static FORCE_INLINE int tDecodeU32v(SDecoder* pDecoder, uint32_t* val) {
int64_t i = 0;
*val = 0;
for (;;) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, 1)) return -1;
uint32_t tval = TD_CODER_CURRENT(pDecoder)[i];
if (tval < ENCODE_LIMIT) {
(*val) |= (tval << (7 * i));
break;
} else {
(*val) |= (((tval) & (ENCODE_LIMIT - 1)) << (7 * i));
i++;
}
}
TD_CODER_MOVE_POS(pDecoder, i);
return 0;
}
static FORCE_INLINE int tDecodeI32v(SDecoder* pDecoder, int32_t* val) {
uint32_t tval;
if (tDecodeU32v(pDecoder, &tval) < 0) {
return -1;
}
*val = ZIGZAGD(int32_t, tval);
return 0;
}
// 64v
static FORCE_INLINE int tDecodeU64v(SDecoder* pDecoder, uint64_t* val) {
int64_t i = 0;
*val = 0;
for (;;) {
if (TD_CHECK_CODER_CAPACITY_FAILED(pDecoder, 1)) return -1;
uint64_t tval = TD_CODER_CURRENT(pDecoder)[i];
if (tval < ENCODE_LIMIT) {
(*val) |= (tval << (7 * i));
break;
} else {
(*val) |= (((tval) & (ENCODE_LIMIT - 1)) << (7 * i));
i++;
}
}
TD_CODER_MOVE_POS(pDecoder, i);
return 0;
}
static FORCE_INLINE int tDecodeI64v(SDecoder* pDecoder, int64_t* val) {
uint64_t tval;
if (tDecodeU64v(pDecoder, &tval) < 0) {
return -1;
}
*val = ZIGZAGD(int64_t, tval);
return 0;
}
static FORCE_INLINE int tDecodeFloat(SDecoder* pDecoder, float* val) {
// TODO
return 0;
}
static FORCE_INLINE int tDecodeDouble(SDecoder* pDecoder, double* val) {
// TODO
return 0;
}
static FORCE_INLINE int tDecodeCStr(SDecoder* pEncoder, const char** val) {
// TODO
return 0;
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_ENCODE_H_*/
\ No newline at end of file
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_BUFFER_H #ifndef _TD_UTIL_BUFFER_H
#define _TD_UTIL_BUFFER_H #define _TD_UTIL_BUFFER_H
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -71,102 +73,95 @@ int main( int argc, char** argv ) { ...@@ -71,102 +73,95 @@ int main( int argc, char** argv ) {
*/ */
typedef struct SBufferReader { typedef struct SBufferReader {
bool endian; bool endian;
const char* data; const char* data;
size_t pos; size_t pos;
size_t size; size_t size;
} SBufferReader; } SBufferReader;
typedef struct SBufferWriter { typedef struct SBufferWriter {
bool endian; bool endian;
char* data; char* data;
size_t pos; size_t pos;
size_t size; size_t size;
void* (*allocator)( void*, size_t ); void* (*allocator)(void*, size_t);
} SBufferWriter; } SBufferWriter;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// common functions & macros for both reader & writer // common functions & macros for both reader & writer
#define tbufTell( buf ) ((buf)->pos) #define tbufTell(buf) ((buf)->pos)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// reader functions & macros /* ------------------------ BUFFER WRITER FUNCTIONS AND MACROS ------------------------ */
// *Endian*, if true, reader functions of primitive types will do 'ntoh' automatically
#define tbufInitReader( Data, Size, Endian ) {.endian = (Endian), .data = (Data), .pos = 0, .size = ((Data) == NULL ? 0 :(Size))}
size_t tbufSkip( SBufferReader* buf, size_t size );
const char* tbufRead( SBufferReader* buf, size_t size );
void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size );
const char* tbufReadString( SBufferReader* buf, size_t* len );
size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size );
const char* tbufReadBinary( SBufferReader* buf, size_t *len );
size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size );
bool tbufReadBool( SBufferReader* buf );
char tbufReadChar( SBufferReader* buf );
int8_t tbufReadInt8( SBufferReader* buf );
uint8_t tbufReadUint8( SBufferReader* buf );
int16_t tbufReadInt16( SBufferReader* buf );
uint16_t tbufReadUint16( SBufferReader* buf );
int32_t tbufReadInt32( SBufferReader* buf );
uint32_t tbufReadUint32( SBufferReader* buf );
int64_t tbufReadInt64( SBufferReader* buf );
uint64_t tbufReadUint64( SBufferReader* buf );
float tbufReadFloat( SBufferReader* buf );
double tbufReadDouble( SBufferReader* buf );
////////////////////////////////////////////////////////////////////////////////
// writer functions & macros
// *Allocator*, function to allocate memory, will use 'realloc' if NULL // *Allocator*, function to allocate memory, will use 'realloc' if NULL
// *Endian*, if true, writer functions of primitive types will do 'hton' automatically // *Endian*, if true, writer functions of primitive types will do 'hton' automatically
#define tbufInitWriter( Allocator, Endian ) {.endian = (Endian), .data = NULL, .pos = 0, .size = 0, .allocator = ((Allocator) == NULL ? realloc : (Allocator))} #define tbufInitWriter(Allocator, Endian) \
void tbufCloseWriter( SBufferWriter* buf ); { .endian = (Endian), .data = NULL, .pos = 0, .size = 0, .allocator = ((Allocator) == NULL ? realloc : (Allocator)) }
void tbufEnsureCapacity( SBufferWriter* buf, size_t size ); void tbufCloseWriter(SBufferWriter* buf);
size_t tbufReserve( SBufferWriter* buf, size_t size ); void tbufEnsureCapacity(SBufferWriter* buf, size_t size);
char* tbufGetData( SBufferWriter* buf, bool takeOver ); size_t tbufReserve(SBufferWriter* buf, size_t size);
char* tbufGetData(SBufferWriter* buf, bool takeOver);
void tbufWrite( SBufferWriter* buf, const void* data, size_t size ); void tbufWrite(SBufferWriter* buf, const void* data, size_t size);
void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ); void tbufWriteAt(SBufferWriter* buf, size_t pos, const void* data, size_t size);
void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ); void tbufWriteStringLen(SBufferWriter* buf, const char* str, size_t len);
void tbufWriteString( SBufferWriter* buf, const char* str ); void tbufWriteString(SBufferWriter* buf, const char* str);
// the prototype of tbufWriteBinary and tbufWrite are identical // the prototype of tbufWriteBinary and tbufWrite are identical
// the difference is: tbufWriteBinary writes the length of the data to the buffer // the difference is: tbufWriteBinary writes the length of the data to the buffer
// first, then the actual data, which means the reader don't need to know data // first, then the actual data, which means the reader don't need to know data
// size before read. Write only write the data itself, which means the reader // size before read. Write only write the data itself, which means the reader
// need to know data size before read. // need to know data size before read.
void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ); void tbufWriteBinary(SBufferWriter* buf, const void* data, size_t len);
void tbufWriteBool(SBufferWriter* buf, bool data);
void tbufWriteBool( SBufferWriter* buf, bool data ); void tbufWriteBoolAt(SBufferWriter* buf, size_t pos, bool data);
void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ); void tbufWriteChar(SBufferWriter* buf, char data);
void tbufWriteChar( SBufferWriter* buf, char data ); void tbufWriteCharAt(SBufferWriter* buf, size_t pos, char data);
void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ); void tbufWriteInt8(SBufferWriter* buf, int8_t data);
void tbufWriteInt8( SBufferWriter* buf, int8_t data ); void tbufWriteInt8At(SBufferWriter* buf, size_t pos, int8_t data);
void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ); void tbufWriteUint8(SBufferWriter* buf, uint8_t data);
void tbufWriteUint8( SBufferWriter* buf, uint8_t data ); void tbufWriteUint8At(SBufferWriter* buf, size_t pos, uint8_t data);
void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ); void tbufWriteInt16(SBufferWriter* buf, int16_t data);
void tbufWriteInt16( SBufferWriter* buf, int16_t data ); void tbufWriteInt16At(SBufferWriter* buf, size_t pos, int16_t data);
void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ); void tbufWriteUint16(SBufferWriter* buf, uint16_t data);
void tbufWriteUint16( SBufferWriter* buf, uint16_t data ); void tbufWriteUint16At(SBufferWriter* buf, size_t pos, uint16_t data);
void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ); void tbufWriteInt32(SBufferWriter* buf, int32_t data);
void tbufWriteInt32( SBufferWriter* buf, int32_t data ); void tbufWriteInt32At(SBufferWriter* buf, size_t pos, int32_t data);
void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ); void tbufWriteUint32(SBufferWriter* buf, uint32_t data);
void tbufWriteUint32( SBufferWriter* buf, uint32_t data ); void tbufWriteUint32At(SBufferWriter* buf, size_t pos, uint32_t data);
void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ); void tbufWriteInt64(SBufferWriter* buf, int64_t data);
void tbufWriteInt64( SBufferWriter* buf, int64_t data ); void tbufWriteInt64At(SBufferWriter* buf, size_t pos, int64_t data);
void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ); void tbufWriteUint64(SBufferWriter* buf, uint64_t data);
void tbufWriteUint64( SBufferWriter* buf, uint64_t data ); void tbufWriteUint64At(SBufferWriter* buf, size_t pos, uint64_t data);
void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ); void tbufWriteFloat(SBufferWriter* buf, float data);
void tbufWriteFloat( SBufferWriter* buf, float data ); void tbufWriteFloatAt(SBufferWriter* buf, size_t pos, float data);
void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ); void tbufWriteDouble(SBufferWriter* buf, double data);
void tbufWriteDouble( SBufferWriter* buf, double data ); void tbufWriteDoubleAt(SBufferWriter* buf, size_t pos, double data);
void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data );
/* ------------------------ BUFFER READER FUNCTIONS AND MACROS ------------------------ */
// *Endian*, if true, reader functions of primitive types will do 'ntoh' automatically
#define tbufInitReader(Data, Size, Endian) \
{ .endian = (Endian), .data = (Data), .pos = 0, .size = ((Data) == NULL ? 0 : (Size)) }
size_t tbufSkip(SBufferReader* buf, size_t size);
const char* tbufRead(SBufferReader* buf, size_t size);
void tbufReadToBuffer(SBufferReader* buf, void* dst, size_t size);
const char* tbufReadString(SBufferReader* buf, size_t* len);
size_t tbufReadToString(SBufferReader* buf, char* dst, size_t size);
const char* tbufReadBinary(SBufferReader* buf, size_t* len);
size_t tbufReadToBinary(SBufferReader* buf, void* dst, size_t size);
bool tbufReadBool(SBufferReader* buf);
char tbufReadChar(SBufferReader* buf);
int8_t tbufReadInt8(SBufferReader* buf);
uint8_t tbufReadUint8(SBufferReader* buf);
int16_t tbufReadInt16(SBufferReader* buf);
uint16_t tbufReadUint16(SBufferReader* buf);
int32_t tbufReadInt32(SBufferReader* buf);
uint32_t tbufReadUint32(SBufferReader* buf);
int64_t tbufReadInt64(SBufferReader* buf);
uint64_t tbufReadUint64(SBufferReader* buf);
float tbufReadFloat(SBufferReader* buf);
double tbufReadDouble(SBufferReader* buf);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,6 +25,8 @@ extern "C" { ...@@ -25,6 +25,8 @@ extern "C" {
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode #define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode #define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode
/* ------------------------ LEGACY CODES ------------------------ */
#if 1
// ---- Fixed U8 // ---- Fixed U8
static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) { static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) {
if (buf != NULL) { if (buf != NULL) {
...@@ -368,6 +370,8 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { ...@@ -368,6 +370,8 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
return POINTER_SHIFT(buf, size); return POINTER_SHIFT(buf, size);
} }
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -35,6 +35,8 @@ typedef int8_t td_mode_flag_t; ...@@ -35,6 +35,8 @@ typedef int8_t td_mode_flag_t;
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD) #define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#define TD_IS_NULL(PTR) ((PTR) == NULL)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
#endif #endif
// SVDropTableReq // SVDropTableReq
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq); // int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq);
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq); // void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,12 +13,10 @@ ...@@ -13,12 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnodeDef.h"
#include "vnodeQuery.h" #include "vnodeQuery.h"
#include "vnodeDef.h"
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
return qWorkerInit(NULL, &pVnode->pQuery);
}
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed"); vInfo("query message is processed");
...@@ -46,4 +44,24 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -46,4 +44,24 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return 0; return 0;
} }
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STableInfoMsg *pReq = (STableInfoMsg *)(pMsg->pCont);
STableMetaMsg *pRspMsg;
int ret;
if (metaGetTableInfo(pVnode->pMeta, pReq->tableFname, &pRspMsg) < 0) {
return -1;
}
*pRsp = malloc(sizeof(SRpcMsg));
if (TD_IS_NULL(*pRsp)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(pMsg);
return -1;
}
// TODO
(*pRsp)->pCont = pRspMsg;
return 0;
}
\ No newline at end of file
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include "vnodeDef.h" #include "vnodeDef.h"
#if 0
static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq); static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq);
static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq); static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq);
...@@ -113,4 +115,5 @@ int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) { ...@@ -113,4 +115,5 @@ int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) { void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
// TODO // TODO
} }
\ No newline at end of file #endif
\ No newline at end of file
...@@ -28,7 +28,6 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -28,7 +28,6 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg * pMsg; SRpcMsg * pMsg;
SVnodeReq *pVnodeReq;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
...@@ -51,7 +50,6 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -51,7 +50,6 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVnodeReq vReq;
SVCreateTbReq vCreateTbReq; SVCreateTbReq vCreateTbReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen); void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) { if (ptr == NULL) {
...@@ -70,6 +68,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -70,6 +68,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB: case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_TABLE:
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error // TODO: handle error
...@@ -79,9 +78,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -79,9 +78,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
break; break;
case TDMT_VND_DROP_STB: case TDMT_VND_DROP_STB:
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// TODO: handle error // // TODO: handle error
} // }
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) { if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
......
...@@ -351,7 +351,7 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey ...@@ -351,7 +351,7 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
// Second key is the first tag // Second key is the first tag
void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, 0); void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
pDbt[1].data = varDataVal(pTagVal); pDbt[1].data = varDataVal(pTagVal);
pDbt[1].size = varDataLen(pTagVal); pDbt[1].size = varDataLen(pTagVal);
...@@ -403,10 +403,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { ...@@ -403,10 +403,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
buf = taosDecodeVariantU32(buf, pTbCfg->stbCfg.nTagCols); buf = taosDecodeVariantU32(buf, &(pTbCfg->stbCfg.nTagCols));
pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols); pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols);
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) {
buf = taosDecodeFixedI8(buf, &pTbCfg->stbCfg.pSchema[i].type); buf = taosDecodeFixedI8(buf, &(pTbCfg->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].colId); buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pSchema[i].name);
...@@ -428,4 +428,82 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { ...@@ -428,4 +428,82 @@ static void metaClearTbCfg(STbCfg *pTbCfg) {
} else if (pTbCfg->type == META_CHILD_TABLE) { } else if (pTbCfg->type == META_CHILD_TABLE) {
tfree(pTbCfg->ctbCfg.pTag); tfree(pTbCfg->ctbCfg.pTag);
} }
}
/* ------------------------ FOR QUERY ------------------------ */
int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) {
DBT key = {0};
DBT value = {0};
SMetaDB * pMetaDB = pMeta->pDB;
int ret;
STbCfg tbCfg;
SSchemaKey schemaKey;
DBT key1 = {0};
DBT value1 = {0};
uint32_t ncols;
void * pBuf;
int tlen;
STableMetaMsg *pMsg;
key.data = tbname;
key.size = strlen(tbname) + 1;
ret = pMetaDB->pNameIdx->get(pMetaDB->pNameIdx, NULL, &key, &value, 0);
if (ret != 0) {
// TODO
return -1;
}
metaDecodeTbInfo(value.data, &tbCfg);
switch (tbCfg.type) {
case META_SUPER_TABLE:
schemaKey.uid = tbCfg.stbCfg.suid;
schemaKey.sver = 0;
key1.data = &schemaKey;
key1.size = sizeof(schemaKey);
ret = pMetaDB->pSchemaDB->get(pMetaDB->pSchemaDB, &key1, &value1, NULL, 0);
if (ret != 0) {
// TODO
return -1;
}
pBuf = value1.data;
pBuf = taosDecodeFixedU32(pBuf, &ncols);
tlen = sizeof(STableMetaMsg) + (tbCfg.stbCfg.nTagCols + ncols) * sizeof(SSchema);
pMsg = calloc(1, tlen);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pMsg->tbFname, tbCfg.name);
pMsg->numOfTags = tbCfg.stbCfg.nTagCols;
pMsg->numOfColumns = ncols;
pMsg->tableType = tbCfg.type;
pMsg->sversion = 0;
pMsg->tversion = 0;
pMsg->suid = tbCfg.stbCfg.suid;
pMsg->tuid = tbCfg.stbCfg.suid;
for (size_t i = 0; i < tbCfg.stbCfg.nTagCols; i++) {
}
break;
case META_CHILD_TABLE:
ASSERT(0);
break;
case META_NORMAL_TABLE:
ASSERT(0);
break;
default:
ASSERT(0);
break;
}
*ppMsg = pMsg;
return 0;
} }
\ No newline at end of file
...@@ -46,58 +46,4 @@ size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t ...@@ -46,58 +46,4 @@ size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t
} }
return tlen; return tlen;
}
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg) {
int tsize = 0;
tsize += taosEncodeString(pBuf, pTbCfg->name);
tsize += taosEncodeFixedU32(pBuf, pTbCfg->ttl);
tsize += taosEncodeFixedU32(pBuf, pTbCfg->keep);
tsize += taosEncodeFixedU8(pBuf, pTbCfg->type);
switch (pTbCfg->type) {
case META_SUPER_TABLE:
tsize += taosEncodeFixedU64(pBuf, pTbCfg->stbCfg.suid);
tsize += tdEncodeSchema(pBuf, pTbCfg->stbCfg.pSchema);
tsize += tdEncodeSchema(pBuf, pTbCfg->stbCfg.pTagSchema);
break;
case META_CHILD_TABLE:
tsize += taosEncodeFixedU64(pBuf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(pBuf, pTbCfg->ctbCfg.pTag);
break;
case META_NORMAL_TABLE:
tsize += tdEncodeSchema(pBuf, pTbCfg->ntbCfg.pSchema);
break;
default:
break;
}
return tsize;
}
void *metaDecodeTbCfg(void *pBuf, STbCfg *pTbCfg) {
pBuf = taosDecodeString(pBuf, &(pTbCfg->name));
pBuf = taosDecodeFixedU32(pBuf, &(pTbCfg->ttl));
pBuf = taosDecodeFixedU32(pBuf, &(pTbCfg->keep));
pBuf = taosDecodeFixedU8(pBuf, &(pTbCfg->type));
switch (pTbCfg->type) {
case META_SUPER_TABLE:
pBuf = taosDecodeFixedU64(pBuf, &(pTbCfg->stbCfg.suid));
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->stbCfg.pSchema));
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->stbCfg.pTagSchema));
break;
case META_CHILD_TABLE:
pBuf = taosDecodeFixedU64(pBuf, &(pTbCfg->ctbCfg.suid));
pBuf = tdDecodeKVRow(pBuf, &(pTbCfg->ctbCfg.pTag));
break;
case META_NORMAL_TABLE:
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->ntbCfg.pSchema));
break;
default:
break;
}
return pBuf;
} }
\ No newline at end of file
...@@ -17,7 +17,8 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) { ...@@ -17,7 +17,8 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
return false; return false;
} }
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** output, int32_t* outputLen, SMsgBuf* pMsgBuf) { static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen,
SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid name"; const char* msg1 = "invalid name";
const char* msg2 = "wildcard string should be less than %d characters"; const char* msg2 = "wildcard string should be less than %d characters";
const char* msg3 = "database name too long"; const char* msg3 = "database name too long";
...@@ -29,7 +30,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou ...@@ -29,7 +30,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou
* database prefix in pInfo->pMiscInfo->a[0] * database prefix in pInfo->pMiscInfo->a[0]
* wildcard in like clause in pInfo->pMiscInfo->a[1] * wildcard in like clause in pInfo->pMiscInfo->a[1]
*/ */
int16_t showType = pShowInfo->showType; int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix; SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) { if (pDbPrefixToken->type != 0) {
...@@ -80,7 +81,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou ...@@ -80,7 +81,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou
} }
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/; *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -116,8 +117,8 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { ...@@ -116,8 +117,8 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
int32_t val = htonl(pCreate->daysPerFile); int32_t val = htonl(pCreate->daysPerFile);
if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) { if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) {
snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE,
TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); TSDB_MAX_DAYS_PER_FILE);
return buildInvalidOperationMsg(pMsgBuf, msg); return buildInvalidOperationMsg(pMsgBuf, msg);
} }
...@@ -137,15 +138,15 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { ...@@ -137,15 +138,15 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
val = htonl(pCreate->commitTime); val = htonl(pCreate->commitTime);
if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) { if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) {
snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, TSDB_MIN_COMMIT_TIME,
TSDB_MIN_COMMIT_TIME, TSDB_MAX_COMMIT_TIME); TSDB_MAX_COMMIT_TIME);
return buildInvalidOperationMsg(pMsgBuf, msg); return buildInvalidOperationMsg(pMsgBuf, msg);
} }
val = htonl(pCreate->fsyncPeriod); val = htonl(pCreate->fsyncPeriod);
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) { if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD,
TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); TSDB_MAX_FSYNC_PERIOD);
return buildInvalidOperationMsg(pMsgBuf, msg); return buildInvalidOperationMsg(pMsgBuf, msg);
} }
...@@ -284,7 +285,8 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) { ...@@ -284,7 +285,8 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, SEpSet* pEpSet) { int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len,
SEpSet* pEpSet) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
...@@ -293,13 +295,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -293,13 +295,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
// super table name, create table by using dst // super table name, create table by using dst
int32_t numOfTables = (int32_t) taosArrayGetSize(pCreateTable->childTableInfo); int32_t numOfTables = (int32_t)taosArrayGetSize(pCreateTable->childTableInfo);
for(int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j);
SToken* pSTableNameToken = &pCreateTableInfo->stbName; SToken* pSTableNameToken = &pCreateTableInfo->stbName;
char buf[TSDB_TABLE_FNAME_LEN]; char buf[TSDB_TABLE_FNAME_LEN];
SToken sTblToken; SToken sTblToken;
sTblToken.z = buf; sTblToken.z = buf;
...@@ -315,7 +317,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -315,7 +317,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
} }
const char* pStableName = tNameGetTableName(&name); const char* pStableName = tNameGetTableName(&name);
SArray* pValList = pCreateTableInfo->pTagVals; SArray* pValList = pCreateTableInfo->pTagVals;
size_t numOfInputTag = taosArrayGetSize(pValList); size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
...@@ -327,9 +329,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -327,9 +329,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
assert(pSuperTableMeta != NULL); assert(pSuperTableMeta != NULL);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta); SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
STableComInfo tinfo = getTableInfo(pSuperTableMeta); STableComInfo tinfo = getTableInfo(pSuperTableMeta);
STagData *pTag = &pCreateTableInfo->tagdata; STagData* pTag = &pCreateTableInfo->tagdata;
SKVRowBuilder kvRowBuilder = {0}; SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
...@@ -353,17 +355,17 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -353,17 +355,17 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
for (int32_t i = 0; i < nameSize; ++i) { for (int32_t i = 0; i < nameSize; ++i) {
SToken* sToken = taosArrayGet(pNameList, i); SToken* sToken = taosArrayGet(pNameList, i);
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // create tmp buf to avoid alter orginal sqlstr char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // create tmp buf to avoid alter orginal sqlstr
strncpy(tmpTokenBuf, sToken->z, sToken->n); strncpy(tmpTokenBuf, sToken->z, sToken->n);
sToken->z = tmpTokenBuf; sToken->z = tmpTokenBuf;
// if (TK_STRING == sToken->type) { // if (TK_STRING == sToken->type) {
// tscDequoteAndTrimToken(sToken); // tscDequoteAndTrimToken(sToken);
// } // }
// if (TK_ID == sToken->type) { // if (TK_ID == sToken->type) {
// tscRmEscapeAndTrimToken(sToken); // tscRmEscapeAndTrimToken(sToken);
// } // }
SListItem* pItem = taosArrayGet(pValList, i); SListItem* pItem = taosArrayGet(pValList, i);
...@@ -372,7 +374,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -372,7 +374,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
// todo speedup by using hash list // todo speedup by using hash list
for (int32_t t = 0; t < schemaSize; ++t) { for (int32_t t = 0; t < schemaSize; ++t) {
if (strncmp(sToken->z, pTagSchema[t].name, sToken->n) == 0 && strlen(pTagSchema[t].name) == sToken->n) { if (strncmp(sToken->z, pTagSchema[t].name, sToken->n) == 0 && strlen(pTagSchema[t].name) == sToken->n) {
SSchema* pSchema = &pTagSchema[t]; SSchema* pSchema = &pTagSchema[t];
char tagVal[TSDB_MAX_TAGS_LEN] = {0}; char tagVal[TSDB_MAX_TAGS_LEN] = {0};
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
...@@ -382,10 +384,10 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -382,10 +384,10 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
} }
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
// code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision); // code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision);
// if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg4); // return buildInvalidOperationMsg(pMsgBuf, msg4);
// } // }
} else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) { } else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) {
pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision); pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision);
} }
...@@ -416,7 +418,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -416,7 +418,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
if (!findColumnIndex) { if (!findColumnIndex) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
// return buildInvalidOperationMsg(pMsgBuf, "invalid tag name", sToken->z); // return buildInvalidOperationMsg(pMsgBuf, "invalid tag name", sToken->z);
} }
} }
} else { } else {
...@@ -426,8 +428,8 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -426,8 +428,8 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
} }
for (int32_t i = 0; i < numOfInputTag; ++i) { for (int32_t i = 0; i < numOfInputTag; ++i) {
SSchema *pSchema = &pTagSchema[i]; SSchema* pSchema = &pTagSchema[i];
SToken* pItem = taosArrayGet(pValList, i); SToken* pItem = taosArrayGet(pValList, i);
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
if (pItem->n > pSchema->bytes) { if (pItem->n > pSchema->bytes) {
...@@ -435,17 +437,17 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -435,17 +437,17 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
return buildInvalidOperationMsg(pMsgBuf, msg3); return buildInvalidOperationMsg(pMsgBuf, msg3);
} }
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
// if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { // if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
//// code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision); //// code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision);
// if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg4); // return buildInvalidOperationMsg(pMsgBuf, msg4);
// } // }
// } else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) { // } else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) {
// pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision); // pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision);
// } // }
} }
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
char* endPtr = NULL; char* endPtr = NULL;
...@@ -478,32 +480,35 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -478,32 +480,35 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
int32_t serLen = tSerializeSVCreateTbReq(NULL, &req); int32_t serLen = sizeof(SMsgHead) + tSerializeSVCreateTbReq(NULL, &req);
char* buf1 = calloc(1, serLen); char* buf1 = calloc(1, serLen);
char* p = buf1; *pOutput = buf1;
tSerializeSVCreateTbReq((void*) &buf1, &req); buf1 += sizeof(SMsgHead);
*pOutput = p; tSerializeSVCreateTbReq((void*)&buf1, &req);
*len = serLen; *len = serLen;
SVgroupInfo info = {0}; SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, req.name, &info); catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, req.name, &info);
pEpSet->inUse = info.inUse; pEpSet->inUse = info.inUse;
pEpSet->numOfEps = info.numOfEps; pEpSet->numOfEps = info.numOfEps;
for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] = info.epAddr[i].port; pEpSet->port[i] = info.epAddr[i].port;
tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
} }
((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen) { int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf,
int32_t msgBufLen) {
int32_t code = 0; int32_t code = 0;
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf *pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
switch (pInfo->type) { switch (pInfo->type) {
case TSDB_SQL_CREATE_USER: case TSDB_SQL_CREATE_USER:
...@@ -551,7 +556,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -551,7 +556,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} }
pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER)? TDMT_MND_CREATE_USER:TDMT_MND_ALTER_USER; pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER) ? TDMT_MND_CREATE_USER : TDMT_MND_ALTER_USER;
break; break;
} }
...@@ -588,14 +593,14 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -588,14 +593,14 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} }
pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT)? TDMT_MND_CREATE_ACCT:TDMT_MND_ALTER_ACCT; pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT) ? TDMT_MND_CREATE_ACCT : TDMT_MND_ALTER_ACCT;
break; break;
} }
case TSDB_SQL_DROP_ACCT: case TSDB_SQL_DROP_ACCT:
case TSDB_SQL_DROP_USER: { case TSDB_SQL_DROP_USER: {
pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT)? TDMT_MND_DROP_ACCT:TDMT_MND_DROP_USER; pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT) ? TDMT_MND_DROP_ACCT : TDMT_MND_DROP_USER;
break; break;
} }
...@@ -613,13 +618,13 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -613,13 +618,13 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return buildInvalidOperationMsg(pMsgBuf, msg); return buildInvalidOperationMsg(pMsgBuf, msg);
} }
SName n = {0}; SName n = {0};
int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); return buildInvalidOperationMsg(pMsgBuf, msg);
} }
SUseDbMsg *pUseDbMsg = (SUseDbMsg *) calloc(1, sizeof(SUseDbMsg)); SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
tNameExtractFullName(&n, pUseDbMsg->db); tNameExtractFullName(&n, pUseDbMsg->db);
pDcl->pMsg = (char*)pUseDbMsg; pDcl->pMsg = (char*)pUseDbMsg;
...@@ -638,7 +643,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -638,7 +643,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return buildInvalidOperationMsg(pMsgBuf, msg2); return buildInvalidOperationMsg(pMsgBuf, msg2);
} }
char buf[TSDB_DB_NAME_LEN] = {0}; char buf[TSDB_DB_NAME_LEN] = {0};
SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf));
if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) {
...@@ -652,7 +657,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -652,7 +657,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
pDcl->pMsg = (char*)pCreateMsg; pDcl->pMsg = (char*)pCreateMsg;
pDcl->msgLen = sizeof(SCreateDbMsg); pDcl->msgLen = sizeof(SCreateDbMsg);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB)? TDMT_MND_CREATE_DB:TDMT_MND_ALTER_DB; pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB;
break; break;
} }
...@@ -668,7 +673,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -668,7 +673,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
SDropDbMsg *pDropDbMsg = (SDropDbMsg*) calloc(1, sizeof(SDropDbMsg)); SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
code = tNameExtractFullName(&name, pDropDbMsg->db); code = tNameExtractFullName(&name, pDropDbMsg->db);
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
...@@ -688,9 +693,10 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -688,9 +693,10 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return code; return code;
} }
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE)? TDMT_VND_CREATE_TABLE:TDMT_MND_CREATE_STB; pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) { } else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) !=
TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -714,7 +720,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -714,7 +720,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} }
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
pDcl->pMsg = (char*) buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; code = terrno;
} }
...@@ -724,7 +730,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -724,7 +730,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} }
case TSDB_SQL_DROP_DNODE: { case TSDB_SQL_DROP_DNODE: {
pDcl->pMsg = (char*) buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; code = terrno;
} }
...@@ -739,4 +745,3 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -739,4 +745,3 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return code; return code;
} }
...@@ -13,14 +13,14 @@ ...@@ -13,14 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tbuffer.h" #include "tbuffer.h"
#include "exception.h" #include "exception.h"
#include "os.h"
//#include "taoserror.h" //#include "taoserror.h"
typedef union Un4B { typedef union Un4B {
uint32_t ui; uint32_t ui;
float f; float f;
} Un4B; } Un4B;
#if __STDC_VERSION__ >= 201112L #if __STDC_VERSION__ >= 201112L
static_assert(sizeof(Un4B) == sizeof(uint32_t), "sizeof(Un4B) must equal to sizeof(uint32_t)"); static_assert(sizeof(Un4B) == sizeof(uint32_t), "sizeof(Un4B) must equal to sizeof(uint32_t)");
...@@ -29,7 +29,7 @@ static_assert(sizeof(Un4B) == sizeof(float), "sizeof(Un4B) must equal to sizeof( ...@@ -29,7 +29,7 @@ static_assert(sizeof(Un4B) == sizeof(float), "sizeof(Un4B) must equal to sizeof(
typedef union Un8B { typedef union Un8B {
uint64_t ull; uint64_t ull;
double d; double d;
} Un8B; } Un8B;
#if __STDC_VERSION__ >= 201112L #if __STDC_VERSION__ >= 201112L
static_assert(sizeof(Un8B) == sizeof(uint64_t), "sizeof(Un8B) must equal to sizeof(uint64_t)"); static_assert(sizeof(Un8B) == sizeof(uint64_t), "sizeof(Un8B) must equal to sizeof(uint64_t)");
...@@ -40,172 +40,172 @@ static_assert(sizeof(Un8B) == sizeof(double), "sizeof(Un8B) must equal to sizeof ...@@ -40,172 +40,172 @@ static_assert(sizeof(Un8B) == sizeof(double), "sizeof(Un8B) must equal to sizeof
// reader functions // reader functions
size_t tbufSkip(SBufferReader* buf, size_t size) { size_t tbufSkip(SBufferReader* buf, size_t size) {
if( (buf->pos + size) > buf->size ) { if ((buf->pos + size) > buf->size) {
THROW( -1 ); THROW(-1);
} }
size_t old = buf->pos; size_t old = buf->pos;
buf->pos += size; buf->pos += size;
return old; return old;
} }
const char* tbufRead( SBufferReader* buf, size_t size ) { const char* tbufRead(SBufferReader* buf, size_t size) {
const char* ret = buf->data + buf->pos; const char* ret = buf->data + buf->pos;
tbufSkip( buf, size ); tbufSkip(buf, size);
return ret; return ret;
} }
void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ) { void tbufReadToBuffer(SBufferReader* buf, void* dst, size_t size) {
assert( dst != NULL ); assert(dst != NULL);
// always using memcpy, leave optimization to compiler // always using memcpy, leave optimization to compiler
memcpy( dst, tbufRead(buf, size), size ); memcpy(dst, tbufRead(buf, size), size);
} }
static size_t tbufReadLength( SBufferReader* buf ) { static size_t tbufReadLength(SBufferReader* buf) {
// maximum length is 65535, if larger length is required // maximum length is 65535, if larger length is required
// this function and the corresponding write function need to be // this function and the corresponding write function need to be
// revised. // revised.
uint16_t l = tbufReadUint16( buf ); uint16_t l = tbufReadUint16(buf);
return l; return l;
} }
const char* tbufReadString( SBufferReader* buf, size_t* len ) { const char* tbufReadString(SBufferReader* buf, size_t* len) {
size_t l = tbufReadLength( buf ); size_t l = tbufReadLength(buf);
const char* ret = buf->data + buf->pos; const char* ret = buf->data + buf->pos;
tbufSkip( buf, l + 1 ); tbufSkip(buf, l + 1);
if( ret[l] != 0 ) { if (ret[l] != 0) {
THROW( -1 ); THROW(-1);
} }
if( len != NULL ) { if (len != NULL) {
*len = l; *len = l;
} }
return ret; return ret;
} }
size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ) { size_t tbufReadToString(SBufferReader* buf, char* dst, size_t size) {
assert( dst != NULL ); assert(dst != NULL);
size_t len; size_t len;
const char* str = tbufReadString( buf, &len ); const char* str = tbufReadString(buf, &len);
if (len >= size) { if (len >= size) {
len = size - 1; len = size - 1;
} }
memcpy( dst, str, len ); memcpy(dst, str, len);
dst[len] = 0; dst[len] = 0;
return len; return len;
} }
const char* tbufReadBinary( SBufferReader* buf, size_t *len ) { const char* tbufReadBinary(SBufferReader* buf, size_t* len) {
size_t l = tbufReadLength( buf ); size_t l = tbufReadLength(buf);
const char* ret = buf->data + buf->pos; const char* ret = buf->data + buf->pos;
tbufSkip( buf, l ); tbufSkip(buf, l);
if( len != NULL ) { if (len != NULL) {
*len = l; *len = l;
} }
return ret; return ret;
} }
size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size ) { size_t tbufReadToBinary(SBufferReader* buf, void* dst, size_t size) {
assert( dst != NULL ); assert(dst != NULL);
size_t len; size_t len;
const char* data = tbufReadBinary( buf, &len ); const char* data = tbufReadBinary(buf, &len);
if( len >= size ) { if (len >= size) {
len = size; len = size;
} }
memcpy( dst, data, len ); memcpy(dst, data, len);
return len; return len;
} }
bool tbufReadBool( SBufferReader* buf ) { bool tbufReadBool(SBufferReader* buf) {
bool ret; bool ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
return ret; return ret;
} }
char tbufReadChar( SBufferReader* buf ) { char tbufReadChar(SBufferReader* buf) {
char ret; char ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
return ret; return ret;
} }
int8_t tbufReadInt8( SBufferReader* buf ) { int8_t tbufReadInt8(SBufferReader* buf) {
int8_t ret; int8_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
return ret; return ret;
} }
uint8_t tbufReadUint8( SBufferReader* buf ) { uint8_t tbufReadUint8(SBufferReader* buf) {
uint8_t ret; uint8_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
return ret; return ret;
} }
int16_t tbufReadInt16( SBufferReader* buf ) { int16_t tbufReadInt16(SBufferReader* buf) {
int16_t ret; int16_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return (int16_t)ntohs( ret ); return (int16_t)ntohs(ret);
} }
return ret; return ret;
} }
uint16_t tbufReadUint16( SBufferReader* buf ) { uint16_t tbufReadUint16(SBufferReader* buf) {
uint16_t ret; uint16_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return ntohs( ret ); return ntohs(ret);
} }
return ret; return ret;
} }
int32_t tbufReadInt32( SBufferReader* buf ) { int32_t tbufReadInt32(SBufferReader* buf) {
int32_t ret; int32_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return (int32_t)ntohl( ret ); return (int32_t)ntohl(ret);
} }
return ret; return ret;
} }
uint32_t tbufReadUint32( SBufferReader* buf ) { uint32_t tbufReadUint32(SBufferReader* buf) {
uint32_t ret; uint32_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return ntohl( ret ); return ntohl(ret);
} }
return ret; return ret;
} }
int64_t tbufReadInt64( SBufferReader* buf ) { int64_t tbufReadInt64(SBufferReader* buf) {
int64_t ret; int64_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return (int64_t)htobe64( ret ); // TODO: ntohll return (int64_t)htobe64(ret); // TODO: ntohll
} }
return ret; return ret;
} }
uint64_t tbufReadUint64( SBufferReader* buf ) { uint64_t tbufReadUint64(SBufferReader* buf) {
uint64_t ret; uint64_t ret;
tbufReadToBuffer( buf, &ret, sizeof(ret) ); tbufReadToBuffer(buf, &ret, sizeof(ret));
if( buf->endian ) { if (buf->endian) {
return htobe64( ret ); // TODO: ntohll return htobe64(ret); // TODO: ntohll
} }
return ret; return ret;
} }
float tbufReadFloat( SBufferReader* buf ) { float tbufReadFloat(SBufferReader* buf) {
Un4B _un; Un4B _un;
tbufReadToBuffer( buf, &_un, sizeof(_un) ); tbufReadToBuffer(buf, &_un, sizeof(_un));
if( buf->endian ) { if (buf->endian) {
_un.ui = ntohl( _un.ui ); _un.ui = ntohl(_un.ui);
} }
return _un.f; return _un.f;
} }
double tbufReadDouble(SBufferReader* buf) { double tbufReadDouble(SBufferReader* buf) {
Un8B _un; Un8B _un;
tbufReadToBuffer( buf, &_un, sizeof(_un) ); tbufReadToBuffer(buf, &_un, sizeof(_un));
if( buf->endian ) { if (buf->endian) {
_un.ull = htobe64( _un.ull ); _un.ull = htobe64(_un.ull);
} }
return _un.d; return _un.d;
} }
...@@ -213,38 +213,38 @@ double tbufReadDouble(SBufferReader* buf) { ...@@ -213,38 +213,38 @@ double tbufReadDouble(SBufferReader* buf) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// writer functions // writer functions
void tbufCloseWriter( SBufferWriter* buf ) { void tbufCloseWriter(SBufferWriter* buf) {
tfree(buf->data); tfree(buf->data);
// (*buf->allocator)( buf->data, 0 ); // potential memory leak. // (*buf->allocator)( buf->data, 0 ); // potential memory leak.
buf->data = NULL; buf->data = NULL;
buf->pos = 0; buf->pos = 0;
buf->size = 0; buf->size = 0;
} }
void tbufEnsureCapacity( SBufferWriter* buf, size_t size ) { void tbufEnsureCapacity(SBufferWriter* buf, size_t size) {
size += buf->pos; size += buf->pos;
if( size > buf->size ) { if (size > buf->size) {
size_t nsize = size + buf->size; size_t nsize = size + buf->size;
char* data = (*buf->allocator)( buf->data, nsize ); char* data = (*buf->allocator)(buf->data, nsize);
// TODO: the exception should be thrown by the allocator function // TODO: the exception should be thrown by the allocator function
if( data == NULL ) { if (data == NULL) {
THROW( -1 ); THROW(-1);
} }
buf->data = data; buf->data = data;
buf->size = nsize; buf->size = nsize;
} }
} }
size_t tbufReserve( SBufferWriter* buf, size_t size ) { size_t tbufReserve(SBufferWriter* buf, size_t size) {
tbufEnsureCapacity( buf, size ); tbufEnsureCapacity(buf, size);
size_t old = buf->pos; size_t old = buf->pos;
buf->pos += size; buf->pos += size;
return old; return old;
} }
char* tbufGetData( SBufferWriter* buf, bool takeOver ) { char* tbufGetData(SBufferWriter* buf, bool takeOver) {
char* ret = buf->data; char* ret = buf->data;
if( takeOver ) { if (takeOver) {
buf->pos = 0; buf->pos = 0;
buf->size = 0; buf->size = 0;
buf->data = NULL; buf->data = NULL;
...@@ -252,192 +252,174 @@ char* tbufGetData( SBufferWriter* buf, bool takeOver ) { ...@@ -252,192 +252,174 @@ char* tbufGetData( SBufferWriter* buf, bool takeOver ) {
return ret; return ret;
} }
void tbufWrite( SBufferWriter* buf, const void* data, size_t size ) { void tbufWrite(SBufferWriter* buf, const void* data, size_t size) {
assert( data != NULL ); assert(data != NULL);
tbufEnsureCapacity( buf, size ); tbufEnsureCapacity(buf, size);
memcpy( buf->data + buf->pos, data, size ); memcpy(buf->data + buf->pos, data, size);
buf->pos += size; buf->pos += size;
} }
void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ) { void tbufWriteAt(SBufferWriter* buf, size_t pos, const void* data, size_t size) {
assert( data != NULL ); assert(data != NULL);
// this function can only be called to fill the gap on previous writes, // this function can only be called to fill the gap on previous writes,
// so 'pos + size <= buf->pos' must be true // so 'pos + size <= buf->pos' must be true
assert( pos + size <= buf->pos ); assert(pos + size <= buf->pos);
memcpy( buf->data + pos, data, size ); memcpy(buf->data + pos, data, size);
} }
static void tbufWriteLength( SBufferWriter* buf, size_t len ) { static void tbufWriteLength(SBufferWriter* buf, size_t len) {
// maximum length is 65535, if larger length is required // maximum length is 65535, if larger length is required
// this function and the corresponding read function need to be // this function and the corresponding read function need to be
// revised. // revised.
assert( len <= 0xffff ); assert(len <= 0xffff);
tbufWriteUint16( buf, (uint16_t)len ); tbufWriteUint16(buf, (uint16_t)len);
} }
void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ) { void tbufWriteStringLen(SBufferWriter* buf, const char* str, size_t len) {
tbufWriteLength( buf, len ); tbufWriteLength(buf, len);
tbufWrite( buf, str, len ); tbufWrite(buf, str, len);
tbufWriteChar( buf, '\0' ); tbufWriteChar(buf, '\0');
} }
void tbufWriteString( SBufferWriter* buf, const char* str ) { void tbufWriteString(SBufferWriter* buf, const char* str) { tbufWriteStringLen(buf, str, strlen(str)); }
tbufWriteStringLen( buf, str, strlen(str) );
}
void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ) { void tbufWriteBinary(SBufferWriter* buf, const void* data, size_t len) {
tbufWriteLength( buf, len ); tbufWriteLength(buf, len);
tbufWrite( buf, data, len ); tbufWrite(buf, data, len);
} }
void tbufWriteBool( SBufferWriter* buf, bool data ) { void tbufWriteBool(SBufferWriter* buf, bool data) { tbufWrite(buf, &data, sizeof(data)); }
tbufWrite( buf, &data, sizeof(data) );
}
void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ) { void tbufWriteBoolAt(SBufferWriter* buf, size_t pos, bool data) { tbufWriteAt(buf, pos, &data, sizeof(data)); }
tbufWriteAt( buf, pos, &data, sizeof(data) );
}
void tbufWriteChar( SBufferWriter* buf, char data ) { void tbufWriteChar(SBufferWriter* buf, char data) { tbufWrite(buf, &data, sizeof(data)); }
tbufWrite( buf, &data, sizeof(data) );
}
void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ) { void tbufWriteCharAt(SBufferWriter* buf, size_t pos, char data) { tbufWriteAt(buf, pos, &data, sizeof(data)); }
tbufWriteAt( buf, pos, &data, sizeof(data) );
}
void tbufWriteInt8( SBufferWriter* buf, int8_t data ) { void tbufWriteInt8(SBufferWriter* buf, int8_t data) { tbufWrite(buf, &data, sizeof(data)); }
tbufWrite( buf, &data, sizeof(data) );
}
void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ) { void tbufWriteInt8At(SBufferWriter* buf, size_t pos, int8_t data) { tbufWriteAt(buf, pos, &data, sizeof(data)); }
tbufWriteAt( buf, pos, &data, sizeof(data) );
}
void tbufWriteUint8( SBufferWriter* buf, uint8_t data ) { void tbufWriteUint8(SBufferWriter* buf, uint8_t data) { tbufWrite(buf, &data, sizeof(data)); }
tbufWrite( buf, &data, sizeof(data) );
}
void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ) { void tbufWriteUint8At(SBufferWriter* buf, size_t pos, uint8_t data) { tbufWriteAt(buf, pos, &data, sizeof(data)); }
tbufWriteAt( buf, pos, &data, sizeof(data) );
}
void tbufWriteInt16( SBufferWriter* buf, int16_t data ) { void tbufWriteInt16(SBufferWriter* buf, int16_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int16_t)htons( data ); data = (int16_t)htons(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ) { void tbufWriteInt16At(SBufferWriter* buf, size_t pos, int16_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int16_t)htons( data ); data = (int16_t)htons(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteUint16( SBufferWriter* buf, uint16_t data ) { void tbufWriteUint16(SBufferWriter* buf, uint16_t data) {
if( buf->endian ) { if (buf->endian) {
data = htons( data ); data = htons(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ) { void tbufWriteUint16At(SBufferWriter* buf, size_t pos, uint16_t data) {
if( buf->endian ) { if (buf->endian) {
data = htons( data ); data = htons(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteInt32( SBufferWriter* buf, int32_t data ) { void tbufWriteInt32(SBufferWriter* buf, int32_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int32_t)htonl( data ); data = (int32_t)htonl(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ) { void tbufWriteInt32At(SBufferWriter* buf, size_t pos, int32_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int32_t)htonl( data ); data = (int32_t)htonl(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteUint32( SBufferWriter* buf, uint32_t data ) { void tbufWriteUint32(SBufferWriter* buf, uint32_t data) {
if( buf->endian ) { if (buf->endian) {
data = htonl( data ); data = htonl(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ) { void tbufWriteUint32At(SBufferWriter* buf, size_t pos, uint32_t data) {
if( buf->endian ) { if (buf->endian) {
data = htonl( data ); data = htonl(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteInt64( SBufferWriter* buf, int64_t data ) { void tbufWriteInt64(SBufferWriter* buf, int64_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int64_t)htobe64( data ); data = (int64_t)htobe64(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ) { void tbufWriteInt64At(SBufferWriter* buf, size_t pos, int64_t data) {
if( buf->endian ) { if (buf->endian) {
data = (int64_t)htobe64( data ); data = (int64_t)htobe64(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteUint64( SBufferWriter* buf, uint64_t data ) { void tbufWriteUint64(SBufferWriter* buf, uint64_t data) {
if( buf->endian ) { if (buf->endian) {
data = htobe64( data ); data = htobe64(data);
} }
tbufWrite( buf, &data, sizeof(data) ); tbufWrite(buf, &data, sizeof(data));
} }
void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ) { void tbufWriteUint64At(SBufferWriter* buf, size_t pos, uint64_t data) {
if( buf->endian ) { if (buf->endian) {
data = htobe64( data ); data = htobe64(data);
} }
tbufWriteAt( buf, pos, &data, sizeof(data) ); tbufWriteAt(buf, pos, &data, sizeof(data));
} }
void tbufWriteFloat( SBufferWriter* buf, float data ) { void tbufWriteFloat(SBufferWriter* buf, float data) {
Un4B _un; Un4B _un;
_un.f = data; _un.f = data;
if( buf->endian ) { if (buf->endian) {
_un.ui = htonl( _un.ui ); _un.ui = htonl(_un.ui);
} }
tbufWrite( buf, &_un, sizeof(_un) ); tbufWrite(buf, &_un, sizeof(_un));
} }
void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ) { void tbufWriteFloatAt(SBufferWriter* buf, size_t pos, float data) {
Un4B _un; Un4B _un;
_un.f = data; _un.f = data;
if( buf->endian ) { if (buf->endian) {
_un.ui = htonl( _un.ui ); _un.ui = htonl(_un.ui);
} }
tbufWriteAt( buf, pos, &_un, sizeof(_un) ); tbufWriteAt(buf, pos, &_un, sizeof(_un));
} }
void tbufWriteDouble( SBufferWriter* buf, double data ) { void tbufWriteDouble(SBufferWriter* buf, double data) {
Un8B _un; Un8B _un;
_un.d = data; _un.d = data;
if( buf->endian ) { if (buf->endian) {
_un.ull = htobe64( _un.ull ); _un.ull = htobe64(_un.ull);
} }
tbufWrite( buf, &_un, sizeof(_un) ); tbufWrite(buf, &_un, sizeof(_un));
} }
void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ) { void tbufWriteDoubleAt(SBufferWriter* buf, size_t pos, double data) {
Un8B _un; Un8B _un;
_un.d = data; _un.d = data;
if( buf->endian ) { if (buf->endian) {
_un.ull = htobe64( _un.ull ); _un.ull = htobe64(_un.ull);
} }
tbufWriteAt( buf, pos, &_un, sizeof(_un) ); tbufWriteAt(buf, pos, &_un, sizeof(_un));
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册