提交 c1bbea8c 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -57,46 +57,6 @@ extern int tMsgDict[];
typedef uint16_t tmsg_t;
/* ------------------------ ENCODE/DECODE FUNCTIONS AND MACROS ------------------------ */
struct SMEListNode {
TD_SLIST_NODE(SMEListNode);
SEncoder coder;
};
typedef struct SMsgEncoder {
SEncoder coder;
TD_SLIST(SMEListNode) eStack; // encode stack
} SMsgEncoder;
struct SMDFreeListNode {
TD_SLIST_NODE(SMDFreeListNode);
char payload[];
};
struct SMDListNode {
TD_SLIST_NODE(SMDListNode);
SDecoder coder;
};
typedef struct SMsgDecoder {
SDecoder coder;
TD_SLIST(SMDListNode) dStack;
TD_SLIST(SMDFreeListNode) freeList;
} SMsgDecoder;
#define TMSG_MALLOC(SIZE, DECODER) \
({ \
void* ptr = malloc((SIZE) + sizeof(struct SMDFreeListNode)); \
if (ptr) { \
TD_SLIST_PUSH(&((DECODER)->freeList), (struct SMDFreeListNode*)ptr); \
ptr = POINTER_SHIFT(ptr, sizeof(struct SMDFreeListNode*)); \
} \
ptr; \
})
void tmsgInitMsgDecoder(SMsgDecoder* pMD, td_endian_t endian, uint8_t* data, int64_t size);
void tmsgClearMsgDecoder(SMsgDecoder* pMD);
/* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type
#define TSDB_IE_TYPE_SEC 1
......@@ -1286,10 +1246,10 @@ typedef struct {
SArray* pArray;
} SVCreateTbBatchReq;
int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq);
int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq);
int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq);
int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct SVCreateTbRsp {
} SVCreateTbRsp;
......
......@@ -40,8 +40,9 @@ enum {
// the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" )
......
......@@ -27,7 +27,7 @@ extern "C" {
#include "tname.h"
#include "tvariant.h"
/*
/**
* The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode.
*/
......@@ -157,7 +157,7 @@ typedef struct SVgDataBlocks {
typedef struct SInsertStmtInfo {
int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
......
......@@ -120,11 +120,12 @@ typedef struct SSubplanId {
} SSubplanId;
typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t level; // the execution level of current subplan, starting from 0.
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pChildren; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink
......
此差异已折叠。
......@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util common transport parser planner catalog scheduler function qcom
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
}
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
......
......@@ -29,7 +29,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
tscInfo("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
......@@ -47,6 +47,8 @@ void taos_cleanup(void) {
rpcCleanup();
taosCloseLog();
tscInfo("all local resources released");
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
......@@ -140,7 +142,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
SRequestObj *pRequest = (SRequestObj *) pRes;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS ||
taos_num_fields(pRes) == 0) {
return NULL;
}
......
......@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// // assert(pConn != NULL);
......@@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
......
......@@ -12,4 +12,6 @@ target_link_libraries(
INTERFACE api
)
ADD_SUBDIRECTORY(test)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
......@@ -27,78 +27,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
static int tmsgStartEncode(SMsgEncoder *pME);
static void tmsgEndEncode(SMsgEncoder *pME);
static int tmsgStartDecode(SMsgDecoder *pMD);
static void tmsgEndDecode(SMsgDecoder *pMD);
/* ------------------------ ENCODE/DECODE FUNCTIONS ------------------------ */
void tmsgInitMsgEncoder(SMsgEncoder *pME, td_endian_t endian, uint8_t *data, int64_t size) {
tInitEncoder(&(pME->coder), endian, data, size);
TD_SLIST_INIT(&(pME->eStack));
}
void tmsgClearMsgEncoder(SMsgEncoder *pME) {
struct SMEListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pME->eStack));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pME->eStack));
free(pNode);
}
}
void tmsgInitMsgDecoder(SMsgDecoder *pMD, td_endian_t endian, uint8_t *data, int64_t size) {
tInitDecoder(&pMD->coder, endian, data, size);
TD_SLIST_INIT(&(pMD->dStack));
TD_SLIST_INIT(&(pMD->freeList));
}
void tmsgClearMsgDecoder(SMsgDecoder *pMD) {
{
struct SMDFreeListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pMD->freeList));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pMD->freeList));
free(pNode);
}
}
{
struct SMDListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pMD->dStack));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pMD->dStack));
free(pNode);
}
}
}
/* ------------------------ MESSAGE ENCODE/DECODE ------------------------ */
int tmsgSVCreateTbReqEncode(SMsgEncoder *pCoder, SVCreateTbReq *pReq) {
tmsgStartEncode(pCoder);
// TODO
tmsgEndEncode(pCoder);
return 0;
}
int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) {
tmsgStartDecode(pCoder);
// TODO: decode
// Decode is not end
if (pCoder->coder.pos != pCoder->coder.size) {
// Continue decode
}
tmsgEndDecode(pCoder);
return 0;
}
int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) {
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
......@@ -193,62 +122,29 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
return buf;
}
/* ------------------------ STATIC METHODS ------------------------ */
static int tmsgStartEncode(SMsgEncoder *pME) {
struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode));
if (TD_IS_NULL(pNode)) return -1;
pNode->coder = pME->coder;
TD_SLIST_PUSH(&(pME->eStack), pNode);
TD_CODER_MOVE_POS(&(pME->coder), sizeof(int32_t));
return 0;
}
static void tmsgEndEncode(SMsgEncoder *pME) {
int32_t size;
struct SMEListNode *pNode;
pNode = TD_SLIST_HEAD(&(pME->eStack));
ASSERT(pNode);
TD_SLIST_POP(&(pME->eStack));
size = pME->coder.pos - pNode->coder.pos;
tEncodeI32(&(pNode->coder), size);
free(pNode);
}
static int tmsgStartDecode(SMsgDecoder *pMD) {
struct SMDListNode *pNode;
int32_t size;
pNode = (struct SMDListNode *)malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
tDecodeI32(&(pMD->coder), &size);
pNode->coder = pMD->coder;
TD_SLIST_PUSH(&(pMD->dStack), pNode);
int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
int tlen = 0;
pMD->coder.pos = 0;
pMD->coder.size = size - sizeof(int32_t);
pMD->coder.data = TD_CODER_CURRENT(&(pNode->coder));
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq);
}
return 0;
return tlen;
}
static void tmsgEndDecode(SMsgDecoder *pMD) {
ASSERT(pMD->coder.pos == pMD->coder.size);
struct SMDListNode *pNode;
pNode = TD_SLIST_HEAD(&(pMD->dStack));
ASSERT(pNode);
TD_SLIST_POP(&(pMD->dStack));
pNode->coder.pos += pMD->coder.size;
void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
pMD->coder = pNode->coder;
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize);
for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req;
buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req);
}
free(pNode);
return buf;
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
}
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg * pMsg;
SRpcMsg *pMsg;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
......@@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
......@@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_TABLE:
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error
......@@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the requst struct
break;
case TDMT_VND_CREATE_TABLE:
tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
}
}
case TDMT_VND_DROP_STB:
case TDMT_VND_DROP_TABLE:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
......
......@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy);
extern void* indexQhandle;
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
char* colName;
int32_t nColName;
} ICacheKey;
int indexFlushCacheTFile(SIndex* sIdx, void*);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
......
......@@ -42,6 +42,7 @@ typedef struct IndexCache {
int32_t version;
int32_t nTerm;
int8_t type;
uint64_t suid;
pthread_mutex_t mtx;
} IndexCache;
......@@ -58,7 +59,7 @@ typedef struct CacheTerm {
} CacheTerm;
//
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheDestroy(void* cache);
......
......@@ -38,6 +38,7 @@ typedef struct WriterCtx {
int fd;
bool readOnly;
char buf[256];
int size;
} file;
struct {
int32_t capa;
......
......@@ -49,13 +49,6 @@ typedef struct TFileValue {
int32_t offset;
} TFileValue;
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
char* colName;
int32_t nColName;
} TFileCacheKey;
// table cache
// refactor to LRU cache later
typedef struct TFileCache {
......@@ -103,10 +96,10 @@ typedef struct TFileReaderOpt {
// tfile cache, manage tindex reader
TFileCache* tfileCacheCreate(const char* path);
void tfileCacheDestroy(TFileCache* tcache);
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key);
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader);
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
......@@ -124,6 +117,7 @@ int tfileWriterFinish(TFileWriter* tw);
//
IndexTFile* indexTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
......
......@@ -34,7 +34,7 @@ extern "C" {
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(var) == sizeof(type)); \
assert(sizeof(type) == sizeof(c)); \
memcpy((void*)buf, (void*)&c, sizeof(c)); \
buf += sizeof(c); \
} while (0)
......
......@@ -17,6 +17,7 @@
#include "indexInt.h"
#include "index_cache.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tdef.h"
#include "tsched.h"
......@@ -72,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1);
......@@ -82,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return 0;
#endif
END:
if (sIdx != NULL) { indexClose(sIdx); }
*index = NULL;
return -1;
......@@ -102,6 +106,7 @@ void indexClose(SIndex* sIdx) {
}
taosHashCleanup(sIdx->colObj);
pthread_mutex_destroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex);
#endif
free(sIdx->path);
free(sIdx);
......@@ -130,18 +135,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
pthread_mutex_lock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
if (cache == NULL) {
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
}
}
pthread_mutex_unlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid);
if (ret != 0) { return ret; }
......@@ -200,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
indexInterResultsDestroy(interResults);
#endif
return 1;
return 0;
}
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
......@@ -296,7 +311,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info
IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
if (pCache == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
......@@ -360,6 +380,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
if (sz > 0) {
// TODO(yihao): remove duplicate tableid
TFileValue* lv = taosArrayGetP(result, sz - 1);
// indexError("merge colVal: %s", lv->colVal);
if (strcmp(lv->colVal, tv->colVal) == 0) {
taosArrayAddAll(lv->tableId, tv->tableId);
tfileValueDestroy(tv);
......@@ -368,6 +389,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
}
} else {
taosArrayPush(result, &tv);
// indexError("merge colVal: %s", tv->colVal);
}
}
static void indexDestroyTempResult(SArray* result) {
......@@ -383,10 +405,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { indexWarn("empty pReader found"); }
// handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader);
if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
SArray* result = taosArrayInit(1024, sizeof(void*));
......@@ -459,14 +483,14 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
} else {
if (value->val != NULL) { taosArrayClear(value->val); }
}
// free(value->colVal);
free(value->colVal);
value->colVal = NULL;
}
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t version = CACHE_VERSION(cache);
uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType);
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
if (tw == NULL) {
indexError("failed to open file to write");
return -1;
......@@ -479,14 +503,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
}
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
char buf[128] = {0};
TFileHeader* header = &reader->header;
ICacheKey key = {
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
char buf[128] = {0};
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
......@@ -497,3 +520,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
END:
tfileWriterClose(tw);
}
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
return buf - p;
}
......@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10000 * 10
#define MEM_TERM_LIMIT 5 * 10000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
......@@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera);
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
IndexCache* cache = calloc(1, sizeof(IndexCache));
if (cache == NULL) {
indexError("failed to create index cache");
......@@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
cache->type = type;
cache->index = idx;
cache->version = 0;
cache->suid = suid;
pthread_mutex_init(&cache->mtx, NULL);
indexCacheRef(cache);
return cache;
......@@ -150,6 +150,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
MemTable* tbl = cache->imm;
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->val.colVal = NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
......@@ -353,6 +354,9 @@ static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; }
IterateValue* iv = &itera->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
......
......@@ -319,7 +319,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
assert(s->state == OneTransNext || s->state == OneTrans);
uint8_t val;
COMMON_INDEX(inp, 0x111111, val);
COMMON_INDEX(inp, 0b111111, val);
s->val = (s->val & fstStateDict[s->state].val) | val;
}
......@@ -369,7 +369,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
bool null = false;
uint8_t inp = fstStateCommInput(s, &null);
uint8_t* data = fstSliceData(slice, NULL);
return null == false ? inp : data[-1];
return null == false ? inp : data[node->start - 1];
}
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert(s->state == AnyTrans);
......@@ -1062,6 +1062,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
} else {
*null = true;
}
fstNodeDestroy(node);
return res;
}
......@@ -1286,6 +1287,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState);
free(buf);
fstSliceDestroy(&slice);
taosArrayDestroy(nodes);
return result;
}
free(buf);
......
......@@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (readOnly == false) {
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenCreateWriteAppend(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
} else {
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenRead(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
}
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.fd < 0) {
......@@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
free(ctx->mem.buf);
} else {
tfClose(ctx->file.fd);
ctx->flush(ctx);
if (remove) { unlink(ctx->file.buf); }
}
free(ctx);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
p *
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
......@@ -45,13 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
static SArray* tfileGetFileList(const char* path);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache));
......@@ -60,38 +60,41 @@ TFileCache* tfileCacheCreate(const char* path) {
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64;
SArray* files = taosArrayInit(4, sizeof(void*));
tfileGetFileList(path, files);
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
SArray* files = tfileGetFileList(path);
uint64_t suid;
int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i);
if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) {
// refactor later, use colname and version info
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
if (wc == NULL) {
indexError("failed to open index:%s", file);
goto End;
}
char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc);
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
tfileSerialCacheKey(&key, buf);
char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc);
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf));
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader);
// indexTable
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
}
taosArrayDestroyEx(files, tfileDestroyFileName);
return tcache;
......@@ -117,30 +120,30 @@ void tfileCacheDestroy(TFileCache* tcache) {
free(tcache);
}
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf));
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL) { return NULL; }
tfileReaderRef(*reader);
return *reader;
}
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf);
// remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL) {
TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf));
taosHashRemove(tcache->tableCache, buf, sz);
oldReader->remove = true;
tfileReaderUnRef(oldReader);
}
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return;
}
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
......@@ -201,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { return NULL; }
......@@ -219,19 +219,15 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh);
}
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreate(wc);
return reader;
// tfileSerialCacheKey(&key, buf);
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
// char pathBuf[128] = {0};
......@@ -325,17 +321,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
tfileWriterClose(tw);
return -1;
}
// write fst
indexError("--------Begin----------------");
// write data
for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later
TFileValue* v = taosArrayGetP((SArray*)data, i);
if (tfileWriteData(tw, v) == 0) {
//
if (tfileWriteData(tw, v) != 0) {
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
} else {
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
}
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
}
indexError("--------End----------------");
fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb);
tw->fb = NULL;
......@@ -359,7 +357,8 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile->cache = tfileCacheCreate(path);
return tfile;
}
void IndexTFileDestroy(IndexTFile* tfile) {
void indexTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { return; }
tfileCacheDestroy(tfile->cache);
free(tfile);
}
......@@ -369,9 +368,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term;
TFileCacheKey key = {
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
SIndexTerm* term = query->term;
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
if (reader == NULL) { return 0; }
......@@ -385,8 +383,10 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
}
static bool tfileIteratorNext(Iterate* iiter) {
IterateValue* iv = &iiter->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false);
// SArray* tblIds = iv->val;
char* colVal = NULL;
uint64_t offset = 0;
......@@ -406,14 +406,14 @@ static bool tfileIteratorNext(Iterate* iiter) {
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
iv->colVal = colVal;
return true;
// std::string key(ch, sz);
}
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter));
if (tIter == NULL) { return NULL; }
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
......@@ -435,6 +435,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
iter->next = tfileIteratorNext;
iter->getValue = tifileIterateGetValue;
iter->val.val = taosArrayInit(1, sizeof(uint64_t));
iter->val.colVal = NULL;
return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
......@@ -447,13 +448,14 @@ void tfileIteratorDestroy(Iterate* iter) {
streamWithStateDestroy(tIter->st);
fstStreamBuilderDestroy(tIter->fb);
automCtxDestroy(tIter->ctx);
free(tIter);
free(iter);
}
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
if (tf == NULL) { return NULL; }
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key);
}
......@@ -480,7 +482,7 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { return NULL; }
tf->colVal = val;
tf->colVal = tstrdup(val);
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf;
}
......@@ -491,6 +493,7 @@ int tfileValuePush(TFileValue* tf, uint64_t val) {
}
void tfileValueDestroy(TFileValue* tf) {
taosArrayDestroy(tf->tableId);
free(tf->colVal);
free(tf);
}
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
......@@ -545,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
//
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else {
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
}
// assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf));
......@@ -553,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
}
static int tfileReaderLoadFst(TFileReader* reader) {
// current load fst into memory, refactor it later
static int FST_MAX_SIZE = 64 * 1024;
static int FST_MAX_SIZE = 64 * 1024 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { return -1; }
WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE);
......@@ -603,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) {
}
}
static int tfileGetFileList(const char* path, SArray* result) {
static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*));
DIR* dir = opendir(path);
if (NULL == dir) { return -1; }
if (NULL == dir) { return NULL; }
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; }
size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len);
taosArrayPush(result, &buf);
taosArrayPush(files, &buf);
}
closedir(dir);
return 0;
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
return files;
}
static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart
......@@ -636,22 +650,21 @@ static int tfileCompare(const void* a, const void* b) {
if (ret == 0) { return ret; }
return ret < 0 ? -1 : 1;
}
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) {
sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version);
return;
}
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) {
// read suid & colid & version success
return 0;
}
return -1;
}
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) {
sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version);
return;
}
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) {
char filename[128] = {0};
tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename);
}
......@@ -24,8 +24,13 @@ class FstWriter {
_b = fstBuilderCreate(_wc, 0);
}
bool Put(const std::string& key, uint64_t val) {
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstBuilderInsert(_b, skey, val);
fstSliceDestroy(&skey);
return ok;
}
......@@ -61,6 +66,11 @@ class FstReadMemory {
return _fst != NULL;
}
bool Get(const std::string& key, uint64_t* val) {
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstGet(_fst, &skey, val);
fstSliceDestroy(&skey);
......@@ -135,15 +145,109 @@ int Performance_fstWriteRecords(FstWriter* b) {
}
return L * M * N;
}
void Performance_fstReadRecords(FstReadMemory* m) {
std::string str("aa");
for (int i = 0; i < M; i++) {
str[0] = 'a' + i;
str.resize(2);
for (int j = 0; j < N; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < L; k++) {
str.push_back('a');
uint64_t val, cost;
if (m->GetWithTimeCostUs(str, &val, &cost)) {
printf("succes to get kv(%s, %" PRId64 "), cost: %" PRId64 "\n", str.c_str(), val, cost);
} else {
printf("failed to get key: %s\n", str.c_str());
}
}
}
}
}
void checkMillonWriteAndReadOfFst() {
tfInit();
FstWriter* fw = new FstWriter;
Performance_fstWriteRecords(fw);
delete fw;
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
if (fr->init()) { printf("success to init fst read"); }
Performance_fstReadRecords(fr);
tfCleanup();
delete fr;
}
void checkFstLongTerm() {
tfInit();
FstWriter* fw = new FstWriter;
// Performance_fstWriteRecords(fw);
fw->Put("A B", 1);
fw->Put("C", 2);
fw->Put("a", 3);
delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
delete m;
return;
}
{
uint64_t val = 0;
if (m->Get("A B", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
{
uint64_t val = 0;
if (m->Get("C", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
{
uint64_t val = 0;
if (m->Get("a", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
// prefix search
// std::vector<uint64_t> result;
// AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
// m->Search(ctx, result);
// std::cout << "size: " << result.size() << std::endl;
// assert(result.size() == count);
// for (int i = 0; i < result.size(); i++) {
// assert(result[i] == i); // check result
//}
tfCleanup();
// free(ctx);
// delete m;
}
void checkFstCheckIterator() {
tfInit();
FstWriter* fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int count = 2;
Performance_fstWriteRecords(fw);
// Performance_fstWriteRecords(fw);
int64_t e = taosGetTimestampUs();
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
fw->Put("Hello world", 1);
fw->Put("hello world", 2);
fw->Put("hello worle", 3);
fw->Put("hello worlf", 4);
delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64);
......@@ -171,7 +275,7 @@ void checkFstCheckIterator() {
void fst_get(Fst* fst) {
for (int i = 0; i < 10000; i++) {
std::string term = "Hello";
std::string term = "Hello World";
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
uint64_t offset = 0;
bool ret = fstGet(fst, &key, &offset);
......@@ -189,7 +293,7 @@ void validateTFile(char* arg) {
std::thread threads[NUM_OF_THREAD];
// std::vector<std::thread> threads;
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1");
TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1");
for (int i = 0; i < NUM_OF_THREAD; i++) {
threads[i] = std::thread(fst_get, reader->fst);
......@@ -203,9 +307,12 @@ void validateTFile(char* arg) {
tfCleanup();
}
int main(int argc, char* argv[]) {
if (argc > 1) { validateTFile(argv[1]); }
// tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); }
// checkFstCheckIterator();
// checkFstLongTerm();
// checkFstPrefixSearch();
checkMillonWriteAndReadOfFst();
return 1;
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
......@@ -457,7 +458,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
// taosArrayPush(data, &v4);
fObj->Put(data);
for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); }
for (size_t i = 0; i < taosArrayGetSize(data); i++) {
// data
destroyTFileValue(taosArrayGetP(data, i));
}
taosArrayDestroy(data);
std::string colName("voltage");
......@@ -470,6 +474,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
fObj->Get(&query, result);
assert(taosArrayGetSize(result) == 200);
indexTermDestroy(term);
taosArrayDestroy(result);
// tfileWriterDestroy(twrite);
}
......@@ -477,7 +482,7 @@ class CacheObj {
public:
CacheObj() {
// TODO
cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY);
cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY);
}
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
int ret = indexCachePut(cache, term, uid);
......@@ -534,6 +539,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
// indexTermDestry(term);
}
{
......@@ -541,24 +547,28 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
coj->Debug();
std::cout << "--------first----------" << std::endl;
......@@ -567,12 +577,14 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v4");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
}
coj->Debug();
std::cout << "--------second----------" << std::endl;
......@@ -583,6 +595,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
}
coj->Debug();
......@@ -598,6 +611,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj->Get(&query, colId, 10000, ret, &valType);
std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert(taosArrayGetSize(ret) == 4);
taosArrayDestroy(ret);
indexTermDestroy(term);
}
{
std::string colVal("v2");
......@@ -609,6 +625,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj->Get(&query, colId, 10000, ret, &valType);
assert(taosArrayGetSize(ret) == 1);
taosArrayDestroy(ret);
indexTermDestroy(term);
}
}
class IndexObj {
......@@ -620,7 +639,7 @@ class IndexObj {
indexInit();
}
int Init(const std::string& dir) {
taosRemoveDir(dir.c_str());
// taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
......@@ -645,10 +664,11 @@ class IndexObj {
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal;
size_t colValSize = tColVal.size();
for (int i = 0; i < numOfTable; i++) {
tColVal[tColVal.size() - 1] = 'a' + i % 26;
tColVal[i % colValSize] = 'a' + i % 26;
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
tColVal.c_str(), tColVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) {
......@@ -677,14 +697,23 @@ class IndexObj {
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
return taosArrayGetSize(result);
int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << std::endl;
} else {
}
int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result);
return sz;
// assert(taosArrayGetSize(result) == targetSize);
}
void PutOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
Put(terms, 10);
indexMultiTermDestroy(terms);
......@@ -783,18 +812,21 @@ TEST_F(IndexEnv2, testIndexOpen) {
index->Search(mq, result);
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
assert(taosArrayGetSize(result) == 400);
taosArrayDestroy(result);
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/test";
std::string path = "/tmp/testxxx";
if (index->Init(path) != 0) {
// r
std::cout << "failed to init" << std::endl;
}
int numOfTable = 100 * 10000;
index->WriteMillonData("tag1", "Hello", numOfTable);
int target = index->SearchOne("tag1", "Hello");
index->WriteMillonData("tag1", "Hello Wolrd", numOfTable);
int target = index->SearchOne("tag1", "Hello Wolrd");
std::cout << "Get Index: " << target << std::endl;
assert(numOfTable == target);
}
......@@ -802,6 +834,10 @@ static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello");
int target = idx->SearchOne("tag1", "Hello");
std::cout << "search: " << target << std::endl;
target = idx->SearchOne("tag2", "Test");
std::cout << "search: " << target << std::endl;
idx->PutOne(colName, colVal);
}
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
......@@ -809,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
if (index->Init(path) != 0) {
// opt
}
index->WriteMultiMillonData("tag1", "Hello", 200000);
index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000);
std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) {
......@@ -821,25 +860,17 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
threads[i].join();
}
}
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_multi_thread_read) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp";
std::string path = "/tmp/test1";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp";
std::string path = "/tmp/test2";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp";
std::string path = "/tmp/test3";
if (index->Init(path) != 0) {}
}
......@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog function transport qcom
)
ADD_SUBDIRECTORY(test)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
......@@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type
* @return
*/
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen);
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
......
......@@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
%type create_stable_args{SCreateTableSql*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
......
......@@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(&pInfo->sub);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
......
......@@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL;
SName name = {0};
SName name = {0};
tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
......@@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
pEpSet->numOfEps = info->numOfEps;
pEpSet->inUse = info->inUse;
for(int32_t i = 0; i < pEpSet->numOfEps; ++i) {
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
pEpSet->port[i] = info->epAddr[i].port;
}
......@@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
val = htonl(pCreate->numOfVgroups);
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
}
return TSDB_CODE_SUCCESS;
......@@ -326,8 +326,7 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info;
} SVgroupTablesBatch;
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len,
SEpSet* pEpSet) {
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long";
......@@ -359,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL;
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
assert(pSuperTableMeta != NULL);
// too long tag values will return invalid sql, not be truncated automatically
......@@ -468,7 +471,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pValList, i);
SToken* pItem = taosArrayGet(pValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
......@@ -481,7 +484,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
tdSortKVRowByColIdx(row);
......@@ -501,15 +504,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row;
// pEpSet->inUse = info.inUse;
// pEpSet->numOfEps = info.numOfEps;
// for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
// pEpSet->port[i] = info.epAddr[i].port;
// tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
// }
// ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
// ((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
SVgroupTablesBatch *pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = info;
......@@ -518,22 +513,55 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
} else { // add to the correct vgroup
assert(info.vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req);
}
}
// TODO: serialize and
void *pBuf = NULL;
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) break;
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
} while (true);
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = pStmtInfo;
*len = sizeof(SInsertStmtInfo);
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf,
int32_t msgBufLen) {
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
int32_t code = 0;
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
......@@ -550,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pUser->passwd;
if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
}
if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
}
if (pInfo->type == TSDB_SQL_CREATE_USER) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
} else {
if (pUser->type == TSDB_ALTER_USER_PASSWD) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
} else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
assert(pPwd->type == TSDB_DATA_TYPE_NULL);
......@@ -575,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) {
// pCmd->count = 2;
} else {
return buildInvalidOperationMsg(pMsgBuf, msg4);
code = buildInvalidOperationMsg(pMsgBuf, msg4);
goto _error;
}
} else {
return buildInvalidOperationMsg(pMsgBuf, msg1);
code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
}
}
......@@ -597,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
}
if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
}
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
......@@ -615,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else {
return buildInvalidOperationMsg(pMsgBuf, msg1);
code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
}
}
......@@ -634,7 +672,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_SHOW: {
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf);
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW;
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW;
break;
}
......@@ -643,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg);
code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
}
SName n = {0};
int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n);
if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg);
code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
}
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
......@@ -668,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt);
if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
}
char buf[TSDB_DB_NAME_LEN] = {0};
SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf));
if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
}
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
pDcl->pMsg = (char*)pCreateMsg;
......@@ -698,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SName name = {0};
code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n);
if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
}
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
......@@ -710,25 +758,21 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->pMsg = (char*)pDropDbMsg;
return TSDB_CODE_SUCCESS;
break;
}
case TSDB_SQL_CREATE_TABLE: {
case TSDB_SQL_CREATE_STABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
terrno = code;
goto _error;
}
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) !=
TSDB_CODE_SUCCESS) {
return code;
}
pDcl->msgType = TDMT_VND_CREATE_TABLE;
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
......@@ -740,7 +784,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) {
code = terrno;
goto _error;
}
pDcl->msgType = TDMT_MND_DROP_STB;
......@@ -750,7 +794,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_CREATE_DNODE: {
pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) {
code = terrno;
goto _error;
}
pDcl->msgType = TDMT_MND_CREATE_DNODE;
......@@ -760,7 +804,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_DNODE: {
pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) {
code = terrno;
goto _error;
}
pDcl->msgType = TDMT_MND_DROP_DNODE;
......@@ -771,5 +815,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
break;
}
return code;
return pDcl;
_error:
terrno = code;
tfree(pDcl);
return NULL;
}
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SInsertStmtInfo* pInsertStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt);
return NULL;
}
return pInsertStmt;
}
\ No newline at end of file
......@@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) {
}
bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type;
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type;
}
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
......@@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
}
if (!isDqlSqlStatement(&info)) {
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
if (NULL == pDcl) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
return terrno;
bool toVnode = false;
if (info.type == TSDB_SQL_CREATE_TABLE) {
SCreateTableSql* pCreateSql = info.pCreateTableInfo;
if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
}
pDcl->nodeType = info.type;
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
if (toVnode) {
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pInsertInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
return terrno;
}
*pQuery = (SQueryNode*)pDcl;
pDcl->nodeType = info.type;
}
} else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
......
此差异已折叠。
......@@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
ASSERT_EQ(code, 0);
SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_NE(output, nullptr);
// convert the show command to be the select query
// select name, privilege, create_time, account from information_schema.users;
......@@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) {
ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDclSqlStatement(&info1), true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
ASSERT_EQ(code, 0);
SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_NE(output, nullptr);
destroySqlInfo(&info1);
}
\ No newline at end of file
......@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog cjson parser transport function qcom
)
ADD_SUBDIRECTORY(test)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
......@@ -40,7 +40,7 @@ extern "C" {
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
#define QNODE_INSERT 15
#define QNODE_MODIFY 15
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
......@@ -70,6 +70,11 @@ typedef struct SQueryPlanNode {
struct SQueryPlanNode *pParent;
} SQueryPlanNode;
typedef struct SDataPayloadInfo {
int32_t msgType;
SArray *payload;
} SDataPayloadInfo;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
......
......@@ -37,15 +37,29 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) {
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) {
SDataPayloadInfo* pPayload = calloc(1, sizeof(SDataPayloadInfo));
if (NULL == *pQueryPlan || NULL == blocks || NULL == pPayload) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pQueryPlan)->info.type = QNODE_INSERT;
(*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks;
if (pNode->type == TSDB_SQL_INSERT) {
pPayload->msgType = TDMT_VND_SUBMIT;
} else if (pNode->type == TSDB_SQL_CREATE_TABLE) {
pPayload->msgType = TDMT_VND_CREATE_TABLE;
}
pPayload->payload = blocks;
(*pQueryPlan)->pExtInfo = pPayload;
return TSDB_CODE_SUCCESS;
}
......@@ -62,13 +76,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_SELECT: {
return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan);
}
case TSDB_SQL_INSERT:
return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan);
case TSDB_SQL_CREATE_TABLE:
return createModificationOpPlan(pNode, pQueryPlan);
default:
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
......
......@@ -34,7 +34,7 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME
};
static void* vailidPointer(void* p) {
static void* validPointer(void* p) {
if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
......@@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) {
}
static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size));
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type);
return sink;
......@@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
}
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size));
SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type;
node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
......@@ -184,27 +184,30 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
}
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan)));
SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId);
subplan->type = type;
subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
if (NULL == pCxt->pCurrentSubplan->pChildren) {
pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
}
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
}
SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
} else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
}
taosArrayPush(currentLevel, &subplan);
pCxt->pCurrentSubplan = subplan;
++(pCxt->pDag->numOfSubplans);
......@@ -272,12 +275,13 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
case QNODE_INSERT:
case QNODE_MODIFY:
// Insert is not an operator in a physical plan.
break;
default:
assert(false);
}
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren);
......@@ -287,31 +291,38 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
taosArrayPush(node->pChildren, &child);
}
}
return node;
}
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
size_t numOfVg = taosArrayGetSize(vgs);
for (int32_t i = 0; i < numOfVg; ++i) {
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
for (int32_t i = 0; i < numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->type = QUERY_TYPE_MODIFY;
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
}
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_INSERT == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot);
if (QNODE_MODIFY == pRoot->info.type) {
splitModificationOpSubPlan(pCxt, pRoot);
} else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
++(pCxt->nextId.templateId);
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
}
// todo deal subquery
......@@ -321,12 +332,13 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = {
.pCatalog = pCatalog,
.pDag = vailidPointer(calloc(1, sizeof(SQueryDag))),
.pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL,
.nextId = {0} // todo queryid
};
*pDag = context.pDag;
context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode);
} CATCH(code) {
CLEANUP_EXECUTE();
......
......@@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
return NULL;
}
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized.
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
if (res) {
......@@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
*len = insert->size;
*str = insert->pData;
insert->pData == NULL;
insert->pData = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
*str = cJSON_Print(json);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
......
......@@ -31,17 +31,20 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
destroyQueryPlan(logicPlan);
return code;
}
code = optimizeQueryPlan(logicPlan);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
return code;
}
code = createDag(logicPlan, NULL, pDag);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
qDestroyQueryDag(*pDag);
return code;
}
destroyQueryPlan(logicPlan);
return TSDB_CODE_SUCCESS;
}
......
......@@ -7,8 +7,10 @@ target_include_directories(
)
target_link_libraries(
qcom
PRIVATE os util transport
qcom
PRIVATE os util transport
)
ADD_SUBDIRECTORY(test)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
......@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport planner qcom
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -12,4 +12,6 @@ target_link_libraries(
PRIVATE os util planner qcom common catalog transport
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t m = 0; m < level->taskNum; ++m) {
SSchTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
if (childNum > 0) {
......@@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(plan->pChildern, n);
SSubplan **child = taosArrayGet(plan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
//??
job->attr.needFetch = true;
job->levelNum = levelNum;
......@@ -478,15 +479,27 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
// job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
}
case TDMT_VND_SUBMIT_RSP: {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
......@@ -570,22 +583,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
tfree(param);
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
......@@ -593,6 +613,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTableCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
......@@ -673,6 +696,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t code = 0;
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
......@@ -789,19 +813,15 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
// int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
......
......@@ -59,7 +59,8 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan.execNode.inUse = 0;
scanPlan.execNode.epAddr[0].port = 6030;
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
scanPlan.pChildern = NULL;
scanPlan.pChildren = NULL;
scanPlan.level = 1;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
......@@ -69,14 +70,14 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execNode.numOfEps = 0;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer);
taosArrayPush(mergePointer->pChildren, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge);
......@@ -103,7 +104,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].execNode.inUse = 0;
insertPlan[0].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
insertPlan[0].pChildern = NULL;
insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
......@@ -118,7 +119,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].execNode.inUse = 1;
insertPlan[1].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
insertPlan[1].pChildern = NULL;
insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
......
......@@ -14,4 +14,7 @@ target_link_libraries(
PUBLIC api
)
ADD_SUBDIRECTORY(test)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "encode.h"
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(float) == sizeof(uint32_t), "sizeof(float) must equal to sizeof(uint32_t)");
static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) must equal to sizeof(uint64_t)");
#endif
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type) {
if (type == TD_ENCODER) {
if (data == NULL) size = 0;
} else {
ASSERT(data && size > 0);
}
pCoder->type = type;
pCoder->endian = endian;
pCoder->data = data;
pCoder->size = size;
pCoder->pos = 0;
tFreeListInit(&(pCoder->fl));
TD_SLIST_INIT(&(pCoder->stack));
}
void tCoderClear(SCoder* pCoder) {
tFreeListClear(&(pCoder->fl));
struct SCoderNode* pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pCoder->stack));
if (pNode == NULL) break;
TD_SLIST_POP(&(pCoder->stack));
free(pNode);
}
}
int tStartEncode(SCoder* pCoder) {
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) {
if (pCoder->size - pCoder->pos < sizeof(int32_t)) return -1;
pNode = malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
pNode->data = pCoder->data;
pNode->pos = pCoder->pos;
pNode->size = pCoder->size;
pCoder->data = pNode->data + pNode->pos + sizeof(int32_t);
pCoder->pos = 0;
pCoder->size = pNode->size - pNode->pos - sizeof(int32_t);
TD_SLIST_PUSH(&(pCoder->stack), pNode);
} else {
pCoder->pos += sizeof(int32_t);
}
return 0;
}
void tEndEncode(SCoder* pCoder) {
struct SCoderNode* pNode;
int32_t len;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) {
pNode = TD_SLIST_HEAD(&(pCoder->stack));
ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack));
len = pCoder->pos;
pCoder->data = pNode->data;
pCoder->size = pNode->size;
pCoder->pos = pNode->pos;
if (TD_RT_ENDIAN() == pCoder->endian) {
tPut(int32_t, pCoder->data + pCoder->pos, len);
} else {
tRPut32(pCoder->data + pCoder->pos, len);
}
TD_CODER_MOVE_POS(pCoder, len + sizeof(int32_t));
free(pNode);
}
}
int tStartDecode(SCoder* pCoder) {
int32_t len;
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_DECODER);
if (tDecodeI32(pCoder, &len) < 0) return -1;
pNode = malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
pNode->data = pCoder->data;
pNode->pos = pCoder->pos;
pNode->size = pCoder->size;
pCoder->data = pNode->data + pNode->pos;
pCoder->size = len;
pCoder->pos = 0;
TD_SLIST_PUSH(&(pCoder->stack), pNode);
return 0;
}
void tEndDecode(SCoder* pCoder) {
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_DECODER);
ASSERT(tDecodeIsEnd(pCoder));
pNode = TD_SLIST_HEAD(&(pCoder->stack));
ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack));
pCoder->data = pNode->data;
pCoder->size = pNode->size;
pCoder->pos = pCoder->pos + pNode->pos;
free(pNode);
}
......@@ -41,4 +41,8 @@ target_sources(freelistTest
)
target_link_libraries(freelistTest os util gtest gtest_main)
# encodeTest
add_executable(encodeTest "encodeTest.cpp")
target_link_libraries(encodeTest os util gtest gtest_main)
#include <iostream>
#include "gtest/gtest.h"
#include "encode.h"
#define BUF_SIZE 64
td_endian_t endian_arr[2] = {TD_LITTLE_ENDIAN, TD_BIG_ENDIAN};
static int encode(SCoder *pCoder, int8_t val) { return tEncodeI8(pCoder, val); }
static int encode(SCoder *pCoder, uint8_t val) { return tEncodeU8(pCoder, val); }
static int encode(SCoder *pCoder, int16_t val) { return tEncodeI16(pCoder, val); }
static int encode(SCoder *pCoder, uint16_t val) { return tEncodeU16(pCoder, val); }
static int encode(SCoder *pCoder, int32_t val) { return tEncodeI32(pCoder, val); }
static int encode(SCoder *pCoder, uint32_t val) { return tEncodeU32(pCoder, val); }
static int encode(SCoder *pCoder, int64_t val) { return tEncodeI64(pCoder, val); }
static int encode(SCoder *pCoder, uint64_t val) { return tEncodeU64(pCoder, val); }
static int decode(SCoder *pCoder, int8_t *val) { return tDecodeI8(pCoder, val); }
static int decode(SCoder *pCoder, uint8_t *val) { return tDecodeU8(pCoder, val); }
static int decode(SCoder *pCoder, int16_t *val) { return tDecodeI16(pCoder, val); }
static int decode(SCoder *pCoder, uint16_t *val) { return tDecodeU16(pCoder, val); }
static int decode(SCoder *pCoder, int32_t *val) { return tDecodeI32(pCoder, val); }
static int decode(SCoder *pCoder, uint32_t *val) { return tDecodeU32(pCoder, val); }
static int decode(SCoder *pCoder, int64_t *val) { return tDecodeI64(pCoder, val); }
static int decode(SCoder *pCoder, uint64_t *val) { return tDecodeU64(pCoder, val); }
static int encodev(SCoder *pCoder, int8_t val) { return tEncodeI8(pCoder, val); }
static int encodev(SCoder *pCoder, uint8_t val) { return tEncodeU8(pCoder, val); }
static int encodev(SCoder *pCoder, int16_t val) { return tEncodeI16v(pCoder, val); }
static int encodev(SCoder *pCoder, uint16_t val) { return tEncodeU16v(pCoder, val); }
static int encodev(SCoder *pCoder, int32_t val) { return tEncodeI32v(pCoder, val); }
static int encodev(SCoder *pCoder, uint32_t val) { return tEncodeU32v(pCoder, val); }
static int encodev(SCoder *pCoder, int64_t val) { return tEncodeI64v(pCoder, val); }
static int encodev(SCoder *pCoder, uint64_t val) { return tEncodeU64v(pCoder, val); }
static int decodev(SCoder *pCoder, int8_t *val) { return tDecodeI8(pCoder, val); }
static int decodev(SCoder *pCoder, uint8_t *val) { return tDecodeU8(pCoder, val); }
static int decodev(SCoder *pCoder, int16_t *val) { return tDecodeI16v(pCoder, val); }
static int decodev(SCoder *pCoder, uint16_t *val) { return tDecodeU16v(pCoder, val); }
static int decodev(SCoder *pCoder, int32_t *val) { return tDecodeI32v(pCoder, val); }
static int decodev(SCoder *pCoder, uint32_t *val) { return tDecodeU32v(pCoder, val); }
static int decodev(SCoder *pCoder, int64_t *val) { return tDecodeI64v(pCoder, val); }
static int decodev(SCoder *pCoder, uint64_t *val) { return tDecodeU64v(pCoder, val); }
template <typename T>
static void simple_encode_decode_func(bool var_len) {
uint8_t buf[BUF_SIZE];
SCoder coder;
T min_val, max_val;
T step = 1;
if (typeid(T) == typeid(int8_t)) {
min_val = INT8_MIN;
max_val = INT8_MAX;
step = 1;
} else if (typeid(T) == typeid(uint8_t)) {
min_val = 0;
max_val = UINT8_MAX;
step = 1;
} else if (typeid(T) == typeid(int16_t)) {
min_val = INT16_MIN;
max_val = INT16_MAX;
step = 1;
} else if (typeid(T) == typeid(uint16_t)) {
min_val = 0;
max_val = UINT16_MAX;
step = 1;
} else if (typeid(T) == typeid(int32_t)) {
min_val = INT32_MIN;
max_val = INT32_MAX;
step = ((T)1) << 16;
} else if (typeid(T) == typeid(uint32_t)) {
min_val = 0;
max_val = UINT32_MAX;
step = ((T)1) << 16;
} else if (typeid(T) == typeid(int64_t)) {
min_val = INT64_MIN;
max_val = INT64_MAX;
step = ((T)1) << 48;
} else if (typeid(T) == typeid(uint64_t)) {
min_val = 0;
max_val = UINT64_MAX;
step = ((T)1) << 48;
}
T i = min_val;
for (;; /*T i = min_val; i <= max_val; i += step*/) {
T dval;
// Encode NULL
for (td_endian_t endian : endian_arr) {
tCoderInit(&coder, endian, NULL, 0, TD_ENCODER);
if (var_len) {
GTEST_ASSERT_EQ(encodev(&coder, i), 0);
} else {
GTEST_ASSERT_EQ(encode(&coder, i), 0);
GTEST_ASSERT_EQ(coder.pos, sizeof(T));
}
tCoderClear(&coder);
}
// Encode and decode
for (td_endian_t e_endian : endian_arr) {
for (td_endian_t d_endian : endian_arr) {
// Encode
tCoderInit(&coder, e_endian, buf, BUF_SIZE, TD_ENCODER);
if (var_len) {
GTEST_ASSERT_EQ(encodev(&coder, i), 0);
} else {
GTEST_ASSERT_EQ(encode(&coder, i), 0);
GTEST_ASSERT_EQ(coder.pos, sizeof(T));
}
int32_t epos = coder.pos;
tCoderClear(&coder);
// Decode
tCoderInit(&coder, d_endian, buf, BUF_SIZE, TD_DECODER);
if (var_len) {
GTEST_ASSERT_EQ(decodev(&coder, &dval), 0);
} else {
GTEST_ASSERT_EQ(decode(&coder, &dval), 0);
GTEST_ASSERT_EQ(coder.pos, sizeof(T));
}
GTEST_ASSERT_EQ(coder.pos, epos);
if (typeid(T) == typeid(int8_t) || typeid(T) == typeid(uint8_t) || e_endian == d_endian) {
GTEST_ASSERT_EQ(i, dval);
}
tCoderClear(&coder);
}
}
if (i == max_val) break;
if (max_val - i < step) {
i = max_val;
} else {
i = i + step;
}
}
}
TEST(td_encode_test, encode_decode_fixed_len_integer) {
simple_encode_decode_func<int8_t>(false);
simple_encode_decode_func<uint8_t>(false);
simple_encode_decode_func<int16_t>(false);
simple_encode_decode_func<uint16_t>(false);
simple_encode_decode_func<int32_t>(false);
simple_encode_decode_func<uint32_t>(false);
simple_encode_decode_func<int64_t>(false);
simple_encode_decode_func<uint64_t>(false);
}
TEST(td_encode_test, encode_decode_variant_len_integer) {
simple_encode_decode_func<int16_t>(true);
simple_encode_decode_func<uint16_t>(true);
simple_encode_decode_func<int32_t>(true);
simple_encode_decode_func<uint32_t>(true);
simple_encode_decode_func<int64_t>(true);
simple_encode_decode_func<uint64_t>(true);
}
TEST(td_encode_test, encode_decode_cstr) {
uint8_t * buf = new uint8_t[1024 * 1024];
char * cstr = new char[1024 * 1024];
const char *dcstr;
SCoder encoder;
SCoder decoder;
for (size_t i = 0; i < 1024 * 2 - 1; i++) {
memset(cstr, 'a', i);
cstr[i] = '\0';
for (td_endian_t endian : endian_arr) {
// Encode
tCoderInit(&encoder, endian, buf, 1024 * 1024, TD_ENCODER);
GTEST_ASSERT_EQ(tEncodeCStr(&encoder, cstr), 0);
tCoderClear(&encoder);
// Decode
tCoderInit(&decoder, endian, buf, 1024 * 1024, TD_DECODER);
GTEST_ASSERT_EQ(tDecodeCStr(&decoder, &dcstr), 0);
GTEST_ASSERT_EQ(memcmp(dcstr, cstr, i + 1), 0);
tCoderClear(&decoder);
}
}
delete buf;
delete cstr;
}
typedef struct {
int32_t A_a;
int64_t A_b;
char * A_c;
} SStructA_v1;
static int tSStructA_v1_encode(SCoder *pCoder, const SStructA_v1 *pSAV1) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pSAV1->A_a) < 0) return -1;
if (tEncodeI64(pCoder, pSAV1->A_b) < 0) return -1;
if (tEncodeCStr(pCoder, pSAV1->A_c) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pSAV1->A_a) < 0) return -1;
if (tDecodeI64(pCoder, &pSAV1->A_b) < 0) return -1;
const char *tstr;
uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
pSAV1->A_c = (char *)TCODER_MALLOC(len + 1, pCoder);
memcpy(pSAV1->A_c, tstr, len + 1);
tEndDecode(pCoder);
return 0;
}
typedef struct {
int32_t A_a;
int64_t A_b;
char * A_c;
// -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION--------------
int16_t A_d;
int16_t A_e;
} SStructA_v2;
static int tSStructA_v2_encode(SCoder *pCoder, const SStructA_v2 *pSAV2) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pSAV2->A_a) < 0) return -1;
if (tEncodeI64(pCoder, pSAV2->A_b) < 0) return -1;
if (tEncodeCStr(pCoder, pSAV2->A_c) < 0) return -1;
// ------------------------NEW FIELDS ENCODE-------------------------------
if (tEncodeI16(pCoder, pSAV2->A_d) < 0) return -1;
if (tEncodeI16(pCoder, pSAV2->A_e) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pSAV2->A_a) < 0) return -1;
if (tDecodeI64(pCoder, &pSAV2->A_b) < 0) return -1;
const char *tstr;
uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
pSAV2->A_c = (char *)TCODER_MALLOC(len + 1, pCoder);
memcpy(pSAV2->A_c, tstr, len + 1);
// ------------------------NEW FIELDS DECODE-------------------------------
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI16(pCoder, &pSAV2->A_d) < 0) return -1;
if (tDecodeI16(pCoder, &pSAV2->A_e) < 0) return -1;
} else {
pSAV2->A_d = 0;
pSAV2->A_e = 0;
}
tEndDecode(pCoder);
return 0;
}
typedef struct {
SStructA_v1 *pA;
int32_t v_a;
int8_t v_b;
} SFinalReq_v1;
static int tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) {
if (tStartEncode(pCoder) < 0) return -1;
if (tSStructA_v1_encode(pCoder, ps1->pA) < 0) return -1;
if (tEncodeI32(pCoder, ps1->v_a) < 0) return -1;
if (tEncodeI8(pCoder, ps1->v_b) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) {
if (tStartDecode(pCoder) < 0) return -1;
ps1->pA = (SStructA_v1 *)TCODER_MALLOC(sizeof(*(ps1->pA)), pCoder);
if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct {
SStructA_v2 *pA;
int32_t v_a;
int8_t v_b;
// ----------------------- Feilds added -----------------------
int16_t v_c;
} SFinalReq_v2;
static int tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) {
if (tStartEncode(pCoder) < 0) return -1;
if (tSStructA_v2_encode(pCoder, ps2->pA) < 0) return -1;
if (tEncodeI32(pCoder, ps2->v_a) < 0) return -1;
if (tEncodeI8(pCoder, ps2->v_b) < 0) return -1;
// ----------------------- Feilds added encode -----------------------
if (tEncodeI16(pCoder, ps2->v_c) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
if (tStartDecode(pCoder) < 0) return -1;
ps2->pA = (SStructA_v2 *)TCODER_MALLOC(sizeof(*(ps2->pA)), pCoder);
if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1;
// ----------------------- Feilds added decode -----------------------
if (tDecodeIsEnd(pCoder)) {
ps2->v_c = 0;
} else {
if (tDecodeI16(pCoder, &ps2->v_c) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
TEST(td_encode_test, compound_struct_encode_test) {
SCoder encoder, decoder;
uint8_t * buf1;
int32_t buf1size;
uint8_t * buf2;
int32_t buf2size;
SStructA_v1 sa1 = {.A_a = 10, .A_b = 65478, .A_c = "Hello"};
SStructA_v2 sa2 = {.A_a = 10, .A_b = 65478, .A_c = "Hello", .A_d = 67, .A_e = 13};
SFinalReq_v1 req1 = {.pA = &sa1, .v_a = 15, .v_b = 35};
SFinalReq_v2 req2 = {.pA = &sa2, .v_a = 15, .v_b = 32, .v_c = 37};
SFinalReq_v1 dreq1;
SFinalReq_v2 dreq21, dreq22;
// Get size
tCoderInit(&encoder, TD_LITTLE_ENDIAN, nullptr, 0, TD_ENCODER);
GTEST_ASSERT_EQ(tSFinalReq_v1_encode(&encoder, &req1), 0);
buf1size = encoder.pos;
buf1 = new uint8_t[encoder.pos];
tCoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, nullptr, 0, TD_ENCODER);
GTEST_ASSERT_EQ(tSFinalReq_v2_encode(&encoder, &req2), 0);
buf2size = encoder.pos;
buf2 = new uint8_t[encoder.pos];
tCoderClear(&encoder);
// Encode
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_ENCODER);
GTEST_ASSERT_EQ(tSFinalReq_v1_encode(&encoder, &req1), 0);
tCoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_ENCODER);
GTEST_ASSERT_EQ(tSFinalReq_v2_encode(&encoder, &req2), 0);
tCoderClear(&encoder);
// Decode
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq1), 0);
GTEST_ASSERT_EQ(dreq1.pA->A_a, req1.pA->A_a);
GTEST_ASSERT_EQ(dreq1.pA->A_b, req1.pA->A_b);
GTEST_ASSERT_EQ(strcmp(dreq1.pA->A_c, req1.pA->A_c), 0);
GTEST_ASSERT_EQ(dreq1.v_a, req1.v_a);
GTEST_ASSERT_EQ(dreq1.v_b, req1.v_b);
tCoderClear(&decoder);
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq21), 0);
GTEST_ASSERT_EQ(dreq21.pA->A_a, req1.pA->A_a);
GTEST_ASSERT_EQ(dreq21.pA->A_b, req1.pA->A_b);
GTEST_ASSERT_EQ(strcmp(dreq21.pA->A_c, req1.pA->A_c), 0);
GTEST_ASSERT_EQ(dreq21.pA->A_d, 0);
GTEST_ASSERT_EQ(dreq21.pA->A_e, 0);
GTEST_ASSERT_EQ(dreq21.v_a, req1.v_a);
GTEST_ASSERT_EQ(dreq21.v_b, req1.v_b);
GTEST_ASSERT_EQ(dreq21.v_c, 0);
tCoderClear(&decoder);
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER);
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq22), 0);
GTEST_ASSERT_EQ(dreq22.pA->A_a, req2.pA->A_a);
GTEST_ASSERT_EQ(dreq22.pA->A_b, req2.pA->A_b);
GTEST_ASSERT_EQ(strcmp(dreq22.pA->A_c, req2.pA->A_c), 0);
GTEST_ASSERT_EQ(dreq22.pA->A_d, req2.pA->A_d);
GTEST_ASSERT_EQ(dreq22.pA->A_e, req2.pA->A_e);
GTEST_ASSERT_EQ(dreq22.v_a, req2.v_a);
GTEST_ASSERT_EQ(dreq22.v_b, req2.v_b);
GTEST_ASSERT_EQ(dreq22.v_c, req2.v_c);
tCoderClear(&decoder);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册