提交 10952778 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/feature/qnode' into feature/3.0_wxy

...@@ -57,46 +57,6 @@ extern int tMsgDict[]; ...@@ -57,46 +57,6 @@ extern int tMsgDict[];
typedef uint16_t tmsg_t; typedef uint16_t tmsg_t;
/* ------------------------ ENCODE/DECODE FUNCTIONS AND MACROS ------------------------ */
struct SMEListNode {
TD_SLIST_NODE(SMEListNode);
SEncoder coder;
};
typedef struct SMsgEncoder {
SEncoder coder;
TD_SLIST(SMEListNode) eStack; // encode stack
} SMsgEncoder;
struct SMDFreeListNode {
TD_SLIST_NODE(SMDFreeListNode);
char payload[];
};
struct SMDListNode {
TD_SLIST_NODE(SMDListNode);
SDecoder coder;
};
typedef struct SMsgDecoder {
SDecoder coder;
TD_SLIST(SMDListNode) dStack;
TD_SLIST(SMDFreeListNode) freeList;
} SMsgDecoder;
#define TMSG_MALLOC(SIZE, DECODER) \
({ \
void* ptr = malloc((SIZE) + sizeof(struct SMDFreeListNode)); \
if (ptr) { \
TD_SLIST_PUSH(&((DECODER)->freeList), (struct SMDFreeListNode*)ptr); \
ptr = POINTER_SHIFT(ptr, sizeof(struct SMDFreeListNode*)); \
} \
ptr; \
})
void tmsgInitMsgDecoder(SMsgDecoder* pMD, td_endian_t endian, uint8_t* data, int64_t size);
void tmsgClearMsgDecoder(SMsgDecoder* pMD);
/* ------------------------ OTHER DEFINITIONS ------------------------ */ /* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type // IE type
#define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_SEC 1
...@@ -1090,6 +1050,7 @@ typedef struct SResFetchMsg { ...@@ -1090,6 +1050,7 @@ typedef struct SResFetchMsg {
} SResFetchMsg; } SResFetchMsg;
typedef struct SSchTasksStatusMsg { typedef struct SSchTasksStatusMsg {
SMsgHead header;
uint64_t sId; uint64_t sId;
} SSchTasksStatusMsg; } SSchTasksStatusMsg;
...@@ -1105,6 +1066,7 @@ typedef struct SSchedulerStatusRsp { ...@@ -1105,6 +1066,7 @@ typedef struct SSchedulerStatusRsp {
} SSchedulerStatusRsp; } SSchedulerStatusRsp;
typedef struct STaskCancelMsg { typedef struct STaskCancelMsg {
SMsgHead header;
uint64_t sId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
...@@ -1115,6 +1077,7 @@ typedef struct STaskCancelRsp { ...@@ -1115,6 +1077,7 @@ typedef struct STaskCancelRsp {
} STaskCancelRsp; } STaskCancelRsp;
typedef struct STaskDropMsg { typedef struct STaskDropMsg {
SMsgHead header;
uint64_t sId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
...@@ -1283,8 +1246,6 @@ typedef struct { ...@@ -1283,8 +1246,6 @@ typedef struct {
SArray* pArray; SArray* pArray;
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq);
int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq);
int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
......
...@@ -122,9 +122,10 @@ typedef struct SSubplanId { ...@@ -122,9 +122,10 @@ typedef struct SSubplanId {
typedef struct SSubplan { typedef struct SSubplan {
SSubplanId id; // unique id of the subplan SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t level; // the execution level of current subplan, starting from 0. int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result SArray *pChildren; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink SDataSink *pDataSink; // data of the subplan flow into the datasink
...@@ -147,7 +148,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** ...@@ -147,7 +148,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule // @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan // @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans // @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
......
此差异已折叠。
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util common transport parser planner catalog scheduler function qcom PRIVATE os util common transport parser planner catalog scheduler function qcom
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
\ No newline at end of file ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
...@@ -29,7 +29,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -29,7 +29,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
// this function may be called by user or system, or by both simultaneously. // this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) { void taos_cleanup(void) {
tscDebug("start to cleanup client environment"); tscInfo("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return; return;
...@@ -47,6 +47,8 @@ void taos_cleanup(void) { ...@@ -47,6 +47,8 @@ void taos_cleanup(void) {
rpcCleanup(); rpcCleanup();
taosCloseLog(); taosCloseLog();
tscInfo("all local resources released");
} }
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
...@@ -140,7 +142,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { ...@@ -140,7 +142,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
SRequestObj *pRequest = (SRequestObj *) pRes; SRequestObj *pRequest = (SRequestObj *) pRes;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) { pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS ||
taos_num_fields(pRes) == 0) {
return NULL; return NULL;
} }
......
...@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) { ...@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// // assert(pConn != NULL); // // assert(pConn != NULL);
...@@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) { ...@@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)"); pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
ASSERT_TRUE(false); ASSERT_TRUE(false);
} }
......
...@@ -12,4 +12,6 @@ target_link_libraries( ...@@ -12,4 +12,6 @@ target_link_libraries(
INTERFACE api INTERFACE api
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
...@@ -27,77 +27,6 @@ ...@@ -27,77 +27,6 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
static int tmsgStartEncode(SMsgEncoder *pME);
static void tmsgEndEncode(SMsgEncoder *pME);
static int tmsgStartDecode(SMsgDecoder *pMD);
static void tmsgEndDecode(SMsgDecoder *pMD);
/* ------------------------ ENCODE/DECODE FUNCTIONS ------------------------ */
void tmsgInitMsgEncoder(SMsgEncoder *pME, td_endian_t endian, uint8_t *data, int64_t size) {
tInitEncoder(&(pME->coder), endian, data, size);
TD_SLIST_INIT(&(pME->eStack));
}
void tmsgClearMsgEncoder(SMsgEncoder *pME) {
struct SMEListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pME->eStack));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pME->eStack));
free(pNode);
}
}
void tmsgInitMsgDecoder(SMsgDecoder *pMD, td_endian_t endian, uint8_t *data, int64_t size) {
tInitDecoder(&pMD->coder, endian, data, size);
TD_SLIST_INIT(&(pMD->dStack));
TD_SLIST_INIT(&(pMD->freeList));
}
void tmsgClearMsgDecoder(SMsgDecoder *pMD) {
{
struct SMDFreeListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pMD->freeList));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pMD->freeList));
free(pNode);
}
}
{
struct SMDListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pMD->dStack));
if (TD_IS_NULL(pNode)) break;
TD_SLIST_POP(&(pMD->dStack));
free(pNode);
}
}
}
/* ------------------------ MESSAGE ENCODE/DECODE ------------------------ */
int tmsgSVCreateTbReqEncode(SMsgEncoder *pCoder, SVCreateTbReq *pReq) {
tmsgStartEncode(pCoder);
// TODO
tmsgEndEncode(pCoder);
return 0;
}
int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) {
tmsgStartDecode(pCoder);
// TODO: decode
// Decode is not end
if (pCoder->coder.pos != pCoder->coder.size) {
// Continue decode
}
tmsgEndDecode(pCoder);
return 0;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
...@@ -219,63 +148,3 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { ...@@ -219,63 +148,3 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
return buf; return buf;
} }
\ No newline at end of file
/* ------------------------ STATIC METHODS ------------------------ */
static int tmsgStartEncode(SMsgEncoder *pME) {
struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode));
if (TD_IS_NULL(pNode)) return -1;
pNode->coder = pME->coder;
TD_SLIST_PUSH(&(pME->eStack), pNode);
TD_CODER_MOVE_POS(&(pME->coder), sizeof(int32_t));
return 0;
}
static void tmsgEndEncode(SMsgEncoder *pME) {
int32_t size;
struct SMEListNode *pNode;
pNode = TD_SLIST_HEAD(&(pME->eStack));
ASSERT(pNode);
TD_SLIST_POP(&(pME->eStack));
size = pME->coder.pos - pNode->coder.pos;
tEncodeI32(&(pNode->coder), size);
free(pNode);
}
static int tmsgStartDecode(SMsgDecoder *pMD) {
struct SMDListNode *pNode;
int32_t size;
pNode = (struct SMDListNode *)malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
tDecodeI32(&(pMD->coder), &size);
pNode->coder = pMD->coder;
TD_SLIST_PUSH(&(pMD->dStack), pNode);
pMD->coder.pos = 0;
pMD->coder.size = size - sizeof(int32_t);
pMD->coder.data = TD_CODER_CURRENT(&(pNode->coder));
return 0;
}
static void tmsgEndDecode(SMsgDecoder *pMD) {
ASSERT(pMD->coder.pos == pMD->coder.size);
struct SMDListNode *pNode;
pNode = TD_SLIST_HEAD(&(pMD->dStack));
ASSERT(pNode);
TD_SLIST_POP(&(pMD->dStack));
pNode->coder.pos += pMD->coder.size;
pMD->coder = pNode->coder;
free(pNode);
}
\ No newline at end of file
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom PRIVATE os util transport qcom
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
\ No newline at end of file ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
...@@ -38,6 +38,7 @@ typedef struct WriterCtx { ...@@ -38,6 +38,7 @@ typedef struct WriterCtx {
int fd; int fd;
bool readOnly; bool readOnly;
char buf[256]; char buf[256];
int size;
} file; } file;
struct { struct {
int32_t capa; int32_t capa;
......
...@@ -73,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -73,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1; sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1); sIdx->path = calloc(1, strlen(path) + 1);
...@@ -83,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -83,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return 0; return 0;
#endif #endif
END:
if (sIdx != NULL) { indexClose(sIdx); }
*index = NULL; *index = NULL;
return -1; return -1;
...@@ -135,7 +138,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -135,7 +138,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
...@@ -150,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -150,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
...@@ -212,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result ...@@ -212,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
indexInterResultsDestroy(interResults); indexInterResultsDestroy(interResults);
#endif #endif
return 1; return 0;
} }
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
...@@ -310,7 +313,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -310,7 +313,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName}; ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 5 * 10000
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
......
...@@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (readOnly == false) { if (readOnly == false) {
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenCreateWriteAppend(path); ctx->file.fd = tfOpenCreateWriteAppend(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
} else { } else {
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); // ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenRead(path); ctx->file.fd = tfOpenRead(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
} }
memcpy(ctx->file.buf, path, strlen(path)); memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.fd < 0) { if (ctx->file.fd < 0) {
...@@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { ...@@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
free(ctx->mem.buf); free(ctx->mem.buf);
} else { } else {
tfClose(ctx->file.fd); tfClose(ctx->file.fd);
ctx->flush(ctx);
if (remove) { unlink(ctx->file.buf); } if (remove) { unlink(ctx->file.buf); }
} }
free(ctx); free(ctx);
......
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* p *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
...@@ -45,12 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader); ...@@ -45,12 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static int tfileGetFileList(const char* path, SArray* result); static SArray* tfileGetFileList(const char* path);
static int tfileRmExpireFile(SArray* result); static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem); static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
...@@ -59,21 +60,24 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -59,21 +60,24 @@ TFileCache* tfileCacheCreate(const char* path) {
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64; tcache->capacity = 64;
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = tfileGetFileList(path);
tfileGetFileList(path, files);
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
uint64_t suid; uint64_t suid;
int32_t colId, version; int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) {
// refactor later, use colname and version info
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file); indexInfo("try parse invalid file: %s, skip it", file);
continue; continue;
} }
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64); char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index:%s", file); indexError("failed to open index:%s", file);
goto End; goto End;
...@@ -200,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul ...@@ -200,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
} }
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0}; char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64); WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { return NULL; } if (wcx == NULL) { return NULL; }
...@@ -218,13 +219,11 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -218,13 +219,11 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh); return tfileWriterCreate(wcx, &tfh);
} }
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) { TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0}; char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) { return NULL; } if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
...@@ -324,7 +323,6 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -324,7 +323,6 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
} }
// write data // write data
indexError("--------Begin----------------");
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later // TODO, fst batch write later
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
...@@ -332,11 +330,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -332,11 +330,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId)); (int)taosArrayGetSize(v->tableId));
} else { } else {
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId)); // (int)taosArrayGetSize(v->tableId));
} }
} }
indexError("--------End----------------");
fstBuilderFinish(tw->fb); fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb); fstBuilderDestroy(tw->fb);
tw->fb = NULL; tw->fb = NULL;
...@@ -361,6 +358,7 @@ IndexTFile* indexTFileCreate(const char* path) { ...@@ -361,6 +358,7 @@ IndexTFile* indexTFileCreate(const char* path) {
return tfile; return tfile;
} }
void indexTFileDestroy(IndexTFile* tfile) { void indexTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { return; }
tfileCacheDestroy(tfile->cache); tfileCacheDestroy(tfile->cache);
free(tfile); free(tfile);
} }
...@@ -550,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -550,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
// //
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf); errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else {
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
} }
// assert(nread == sizeof(buf)); // assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
...@@ -558,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -558,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
} }
static int tfileReaderLoadFst(TFileReader* reader) { static int tfileReaderLoadFst(TFileReader* reader) {
// current load fst into memory, refactor it later // current load fst into memory, refactor it later
static int FST_MAX_SIZE = 64 * 1024; static int FST_MAX_SIZE = 64 * 1024 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { return -1; } if (buf == NULL) { return -1; }
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE); assert(nread > 0 && nread < FST_MAX_SIZE);
...@@ -608,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) { ...@@ -608,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) {
} }
} }
static int tfileGetFileList(const char* path, SArray* result) { static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*));
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { return -1; } if (NULL == dir) { return NULL; }
struct dirent* entry; struct dirent* entry;
while ((entry = readdir(dir)) != NULL) { while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; }
size_t len = strlen(entry->d_name); size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1); char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len); memcpy(buf, entry->d_name, len);
taosArrayPush(result, &buf); taosArrayPush(files, &buf);
} }
closedir(dir); closedir(dir);
return 0;
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
return files;
} }
static int tfileRmExpireFile(SArray* result) { static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart // TODO(yihao): remove expire tindex after restart
...@@ -641,15 +650,21 @@ static int tfileCompare(const void* a, const void* b) { ...@@ -641,15 +650,21 @@ static int tfileCompare(const void* a, const void* b) {
if (ret == 0) { return ret; } if (ret == 0) { return ret; }
return ret < 0 ? -1 : 1; return ret < 0 ? -1 : 1;
} }
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) { static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) {
sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version); if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) {
return;
}
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
// read suid & colid & version success // read suid & colid & version success
return 0; return 0;
} }
return -1; return -1;
} }
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) {
sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version);
return;
}
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) {
char filename[128] = {0};
tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename);
}
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <algorithm>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <thread> #include <thread>
...@@ -638,7 +639,7 @@ class IndexObj { ...@@ -638,7 +639,7 @@ class IndexObj {
indexInit(); indexInit();
} }
int Init(const std::string& dir) { int Init(const std::string& dir) {
taosRemoveDir(dir.c_str()); // taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str()); taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx); int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) { if (ret != 0) {
...@@ -663,10 +664,11 @@ class IndexObj { ...@@ -663,10 +664,11 @@ class IndexObj {
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world", int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) { size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal; std::string tColVal = colVal;
size_t colValSize = tColVal.size();
for (int i = 0; i < numOfTable; i++) { for (int i = 0; i < numOfTable; i++) {
tColVal[tColVal.size() - 1] = 'a' + i % 26; tColVal[i % colValSize] = 'a' + i % 26;
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); tColVal.c_str(), tColVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < 10; i++) {
...@@ -695,7 +697,13 @@ class IndexObj { ...@@ -695,7 +697,13 @@ class IndexObj {
indexMultiTermQueryAdd(mq, term, QUERY_TERM); indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << std::endl;
} else {
}
int sz = taosArrayGetSize(result); int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result); taosArrayDestroy(result);
...@@ -810,7 +818,7 @@ TEST_F(IndexEnv2, testIndexOpen) { ...@@ -810,7 +818,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
} }
TEST_F(IndexEnv2, testIndex_TrigeFlush) { TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/test1"; std::string path = "/tmp/testxxx";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
// r // r
std::cout << "failed to init" << std::endl; std::cout << "failed to init" << std::endl;
...@@ -826,6 +834,10 @@ static void write_and_search(IndexObj* idx) { ...@@ -826,6 +834,10 @@ static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello"); std::string colName("tag1"), colVal("Hello");
int target = idx->SearchOne("tag1", "Hello"); int target = idx->SearchOne("tag1", "Hello");
std::cout << "search: " << target << std::endl;
target = idx->SearchOne("tag2", "Test");
std::cout << "search: " << target << std::endl;
idx->PutOne(colName, colVal); idx->PutOne(colName, colVal);
} }
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
...@@ -833,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { ...@@ -833,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
// opt // opt
} }
index->WriteMultiMillonData("tag1", "Hello", 200000); index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000);
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
...@@ -847,15 +862,15 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { ...@@ -847,15 +862,15 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
} }
TEST_F(IndexEnv2, testIndex_restart) { TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp"; std::string path = "/tmp/test1";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
} }
TEST_F(IndexEnv2, testIndex_performance) { TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp"; std::string path = "/tmp/test2";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
} }
TEST_F(IndexEnv2, testIndexMultiTag) { TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp"; std::string path = "/tmp/test3";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
} }
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog function transport qcom PRIVATE os util catalog function transport qcom
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
...@@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. { ...@@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
%type create_stable_args{SCreateTableSql*} %type create_stable_args{SCreateTableSql*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. { create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE); A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
......
...@@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) { ...@@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
taosArrayDestroy(pInfo->funcs); taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) { if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(&pInfo->sub); destroyAllSqlNode(&pInfo->sub);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) { } else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem); taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
......
...@@ -761,8 +761,9 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c ...@@ -761,8 +761,9 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
break; break;
} }
case TSDB_SQL_CREATE_TABLE: { case TSDB_SQL_CREATE_STABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
...@@ -772,13 +773,6 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c ...@@ -772,13 +773,6 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = TDMT_VND_CREATE_TABLE;
} else if (pCreateTable->type == TSQL_CREATE_STREAM) { } else if (pCreateTable->type == TSQL_CREATE_STREAM) {
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) { // if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code; // return code;
......
此差异已折叠。
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog cjson parser transport function qcom PRIVATE os util catalog cjson parser transport function qcom
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
...@@ -70,6 +70,11 @@ typedef struct SQueryPlanNode { ...@@ -70,6 +70,11 @@ typedef struct SQueryPlanNode {
struct SQueryPlanNode *pParent; struct SQueryPlanNode *pParent;
} SQueryPlanNode; } SQueryPlanNode;
typedef struct SDataPayloadInfo {
int32_t msgType;
SArray *payload;
} SDataPayloadInfo;
/** /**
* Optimize the query execution plan, currently not implement yet. * Optimize the query execution plan, currently not implement yet.
* @param pQueryNode * @param pQueryNode
...@@ -101,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); ...@@ -101,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan); int32_t stringToSubplan(const char* str, SSubplan** subplan);
......
...@@ -37,18 +37,28 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { ...@@ -37,18 +37,28 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0; return 0;
} }
static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); *pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) {
SDataPayloadInfo* pPayload = calloc(1, sizeof(SDataPayloadInfo));
if (NULL == *pQueryPlan || NULL == blocks || NULL == pPayload) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pQueryPlan)->info.type = QNODE_MODIFY; (*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks); taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks;
if (pNode->type == TSDB_SQL_INSERT) {
pPayload->msgType = TDMT_VND_SUBMIT;
} else if (pNode->type == TSDB_SQL_CREATE_TABLE) {
pPayload->msgType = TDMT_VND_CREATE_TABLE;
}
pPayload->payload = blocks;
(*pQueryPlan)->pExtInfo = pPayload;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -69,7 +79,7 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { ...@@ -69,7 +79,7 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_INSERT: case TSDB_SQL_INSERT:
case TSDB_SQL_CREATE_TABLE: case TSDB_SQL_CREATE_TABLE:
return createInsertPlan(pNode, pQueryPlan); return createModificationOpPlan(pNode, pQueryPlan);
default: default:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -191,13 +191,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -191,13 +191,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
subplan->level = 0; subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) { if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1; subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) { if (NULL == pCxt->pCurrentSubplan->pChildren) {
pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
} }
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
} }
SArray* currentLevel; SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
...@@ -205,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -205,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
} else { } else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
} }
taosArrayPush(currentLevel, &subplan); taosArrayPush(currentLevel, &subplan);
pCxt->pCurrentSubplan = subplan; pCxt->pCurrentSubplan = subplan;
++(pCxt->pDag->numOfSubplans); ++(pCxt->pDag->numOfSubplans);
...@@ -278,6 +281,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -278,6 +281,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
default: default:
assert(false); assert(false);
} }
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) { if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren); size_t size = taosArrayGetSize(pPlanNode->pChildren);
...@@ -287,30 +291,37 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -287,30 +291,37 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
taosArrayPush(node->pChildren, &child); taosArrayPush(node->pChildren, &child);
} }
} }
return node; return node;
} }
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray* vgs = (SArray*)pPlanNode->pExtInfo; SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
size_t numOfVg = taosArrayGetSize(vgs);
for (int32_t i = 0; i < numOfVg; ++i) { size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
for (int32_t i = 0; i < numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks); subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY; subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
RECOVERY_CURRENT_SUBPLAN(pCxt); RECOVERY_CURRENT_SUBPLAN(pCxt);
} }
} }
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) { if (QNODE_MODIFY == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot); splitModificationOpSubPlan(pCxt, pRoot);
} else { } else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
++(pCxt->nextId.templateId); ++(pCxt->nextId.templateId);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot); subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot); subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
} }
...@@ -325,6 +336,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -325,6 +336,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {0} // todo queryid .nextId = {0} // todo queryid
}; };
*pDag = context.pDag; *pDag = context.pDag;
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode); createSubplanByLevel(&context, pQueryNode);
...@@ -336,6 +348,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -336,6 +348,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
//todo //todo
} }
...@@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) { ...@@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
return NULL; return NULL;
} }
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized. // The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id); bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
if (res) { if (res) {
...@@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { ...@@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink); SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
*len = insert->size; *len = insert->size;
*str = insert->pData; *str = insert->pData;
insert->pData == NULL; insert->pData = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { ...@@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
*str = cJSON_Print(json); *str = cJSON_Print(json);
*len = strlen(*str) + 1; *len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -31,22 +31,25 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) ...@@ -31,22 +31,25 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
return code; return code;
} }
code = optimizeQueryPlan(logicPlan); code = optimizeQueryPlan(logicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
return code; return code;
} }
code = createDag(logicPlan, NULL, pDag); code = createDag(logicPlan, NULL, pDag);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
qDestroyQueryDag(*pDag); qDestroyQueryDag(*pDag);
return code; return code;
} }
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return setSubplanExecutionNode(subplan, templateId, ep); return setSubplanExecutionNode(subplan, templateId, ep);
} }
......
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport PRIVATE os util transport
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
...@@ -11,4 +11,6 @@ target_link_libraries( ...@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport planner qcom PRIVATE os util transport planner qcom
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
\ No newline at end of file ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
...@@ -12,4 +12,6 @@ target_link_libraries( ...@@ -12,4 +12,6 @@ target_link_libraries(
PRIVATE os util planner qcom common catalog transport PRIVATE os util planner qcom common catalog transport
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
\ No newline at end of file ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
...@@ -66,7 +66,9 @@ typedef struct SSchTask { ...@@ -66,7 +66,9 @@ typedef struct SSchTask {
char *msg; // operator tree char *msg; // operator tree
int32_t msgLen; // msg length int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
SEpAddr execAddr; // task actual executed node address SQueryNodeAddr execAddr; // task actual executed node address
int8_t condidateIdx; // current try condidation index
SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SQueryProfileSummary summary; // task execution summary SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
......
...@@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { ...@@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t m = 0; m < level->taskNum; ++m) { for (int32_t m = 0; m < level->taskNum; ++m) {
SSchTask *task = taosArrayGet(level->subTasks, m); SSchTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
if (childNum > 0) { if (childNum > 0) {
...@@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { ...@@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
} }
for (int32_t n = 0; n < childNum; ++n) { for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(plan->pChildern, n); SSubplan **child = taosArrayGet(plan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) { if (NULL == childTask || NULL == *childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
...@@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { ...@@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
//??
job->attr.needFetch = true; job->attr.needFetch = true;
job->levelNum = levelNum; job->levelNum = levelNum;
...@@ -215,28 +216,49 @@ _return: ...@@ -215,28 +216,49 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { if (task->condidateAddrs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
task->condidateIdx = 0;
task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == task->condidateAddrs) {
qError("taosArrayInit failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (task->plan->execNode.numOfEps > 0) {
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
int32_t addNum = 0;
int32_t nodeNum = taosArrayGetSize(job->nodeList); int32_t nodeNum = taosArrayGetSize(job->nodeList);
for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
SEpAddr *addr = taosArrayGet(job->nodeList, i); SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
epSet->port[epSet->numOfEps] = addr->port; qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
++epSet->numOfEps; ++addNum;
} }
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { /*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps; ++epSet->numOfEps;
} }
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -383,8 +405,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { ...@@ -383,8 +405,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn));
job->resEp.port = task->execAddr.port; job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port;
} }
job->fetchTask = task; job->fetchTask = task;
...@@ -394,12 +416,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { ...@@ -394,12 +416,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
++job->dataSrcEps.numOfEps; ++job->dataSrcEps.numOfEps;
} }
*/
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
...@@ -456,12 +480,24 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { ...@@ -456,12 +480,24 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
// job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
}
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rspCode != TSDB_CODE_SUCCESS) {
if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else { } else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
job->resNumOfRows += rsp->affectedRows; job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task); code = schProcessOnTaskSuccess(job, task);
...@@ -547,22 +583,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -547,22 +583,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return: _return:
tfree(param); tfree(param);
SCH_RET(code); SCH_RET(code);
} }
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
} }
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
} }
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
} }
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
} }
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param; SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
...@@ -570,6 +613,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -570,6 +613,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTableCallback;
break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback; *fp = schHandleSubmitCallback;
break; break;
...@@ -633,6 +679,16 @@ _return: ...@@ -633,6 +679,16 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
epSet->inUse = addr->inUse;
epSet->numOfEps = addr->numOfEps;
for (int8_t i = 0; i < epSet->numOfEps; ++i) {
strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn));
epSet->port[i] = addr->epAddr[i].port;
}
}
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
uint32_t msgSize = 0; uint32_t msgSize = 0;
...@@ -640,6 +696,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -640,6 +696,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t code = 0; int32_t code = 0;
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) { if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL"); qError("submit msg is NULL");
...@@ -665,6 +722,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -665,6 +722,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SSubQueryMsg *pMsg = msg; SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
...@@ -681,6 +739,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -681,6 +739,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
} }
SResReadyMsg *pMsg = msg; SResReadyMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
...@@ -698,6 +758,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -698,6 +758,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
} }
SResFetchMsg *pMsg = msg; SResFetchMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
...@@ -712,6 +774,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -712,6 +774,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
} }
STaskDropMsg *pMsg = msg; STaskDropMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
...@@ -723,7 +787,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -723,7 +787,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
break; break;
} }
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize)); SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx);
schConvertAddrToEpSet(addr, &epSet);
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -737,28 +806,22 @@ _return: ...@@ -737,28 +806,22 @@ _return:
int32_t schLaunchTask(SSchJob *job, SSchTask *task) { int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
if (plan->execEpSet.numOfEps <= 0) { SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
}
if (plan->execEpSet.numOfEps <= 0) { if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY; // int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = JOB_TASK_STATUS_EXECUTING; task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schLaunchJob(SSchJob *job) { int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) { for (int32_t i = 0; i < level->taskNum; ++i) {
......
...@@ -54,11 +54,13 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -54,11 +54,13 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan.id.templateId = 0x0000000000000002; scanPlan.id.templateId = 0x0000000000000002;
scanPlan.id.subplanId = 0x0000000000000003; scanPlan.id.subplanId = 0x0000000000000003;
scanPlan.type = QUERY_TYPE_SCAN; scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.execNode.numOfEps = 1;
scanPlan.execNode.nodeId = 1;
scanPlan.execNode.inUse = 0;
scanPlan.execNode.epAddr[0].port = 6030;
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
scanPlan.pChildren = NULL;
scanPlan.level = 1; scanPlan.level = 1;
scanPlan.execEpSet.numOfEps = 1;
scanPlan.execEpSet.port[0] = 6030;
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
scanPlan.pChildern = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
...@@ -67,15 +69,15 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -67,15 +69,15 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan.id.subplanId = 0x5555555555; mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE; mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0; mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 0; mergePlan.execNode.numOfEps = 0;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL; mergePlan.pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer); taosArrayPush(mergePointer->pChildren, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer); taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge); taosArrayPush(dag->pSubplans, &merge);
...@@ -97,10 +99,12 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -97,10 +99,12 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].id.subplanId = 0x0000000000000004; insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].type = QUERY_TYPE_MODIFY; insertPlan[0].type = QUERY_TYPE_MODIFY;
insertPlan[0].level = 0; insertPlan[0].level = 0;
insertPlan[0].execEpSet.numOfEps = 1; insertPlan[0].execNode.numOfEps = 1;
insertPlan[0].execEpSet.port[0] = 6030; insertPlan[0].execNode.nodeId = 1;
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); insertPlan[0].execNode.inUse = 0;
insertPlan[0].pChildern = NULL; insertPlan[0].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL; insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL; insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
...@@ -110,10 +114,12 @@ void schtBuildInsertDag(SQueryDag *dag) { ...@@ -110,10 +114,12 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].id.subplanId = 0x0000000000000005; insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].type = QUERY_TYPE_MODIFY; insertPlan[1].type = QUERY_TYPE_MODIFY;
insertPlan[1].level = 0; insertPlan[1].level = 0;
insertPlan[1].execEpSet.numOfEps = 1; insertPlan[1].execNode.numOfEps = 1;
insertPlan[1].execEpSet.port[0] = 6030; insertPlan[1].execNode.nodeId = 1;
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); insertPlan[1].execNode.inUse = 1;
insertPlan[1].pChildern = NULL; insertPlan[1].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL; insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL; insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
...@@ -132,7 +138,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { ...@@ -132,7 +138,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
return 0; return 0;
} }
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return 0; return 0;
} }
......
...@@ -14,4 +14,7 @@ target_link_libraries( ...@@ -14,4 +14,7 @@ target_link_libraries(
PUBLIC api PUBLIC api
) )
ADD_SUBDIRECTORY(test) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "encode.h"
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(float) == sizeof(uint32_t), "sizeof(float) must equal to sizeof(uint32_t)");
static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) must equal to sizeof(uint64_t)");
#endif
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type) {
if (type == TD_ENCODER) {
if (data == NULL) size = 0;
} else {
ASSERT(data && size > 0);
}
pCoder->type = type;
pCoder->endian = endian;
pCoder->data = data;
pCoder->size = size;
pCoder->pos = 0;
tFreeListInit(&(pCoder->fl));
TD_SLIST_INIT(&(pCoder->stack));
}
void tCoderClear(SCoder* pCoder) {
tFreeListClear(&(pCoder->fl));
struct SCoderNode* pNode;
for (;;) {
pNode = TD_SLIST_HEAD(&(pCoder->stack));
if (pNode == NULL) break;
TD_SLIST_POP(&(pCoder->stack));
free(pNode);
}
}
int tStartEncode(SCoder* pCoder) {
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) {
if (pCoder->size - pCoder->pos < sizeof(int32_t)) return -1;
pNode = malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
pNode->data = pCoder->data;
pNode->pos = pCoder->pos;
pNode->size = pCoder->size;
pCoder->data = pNode->data + pNode->pos + sizeof(int32_t);
pCoder->pos = 0;
pCoder->size = pNode->size - pNode->pos - sizeof(int32_t);
TD_SLIST_PUSH(&(pCoder->stack), pNode);
} else {
pCoder->pos += sizeof(int32_t);
}
return 0;
}
void tEndEncode(SCoder* pCoder) {
struct SCoderNode* pNode;
int32_t len;
ASSERT(pCoder->type == TD_ENCODER);
if (pCoder->data) {
pNode = TD_SLIST_HEAD(&(pCoder->stack));
ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack));
len = pCoder->pos;
pCoder->data = pNode->data;
pCoder->size = pNode->size;
pCoder->pos = pNode->pos;
if (TD_RT_ENDIAN() == pCoder->endian) {
tPut(int32_t, pCoder->data + pCoder->pos, len);
} else {
tRPut32(pCoder->data + pCoder->pos, len);
}
TD_CODER_MOVE_POS(pCoder, len + sizeof(int32_t));
free(pNode);
}
}
int tStartDecode(SCoder* pCoder) {
int32_t len;
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_DECODER);
if (tDecodeI32(pCoder, &len) < 0) return -1;
pNode = malloc(sizeof(*pNode));
if (pNode == NULL) return -1;
pNode->data = pCoder->data;
pNode->pos = pCoder->pos;
pNode->size = pCoder->size;
pCoder->data = pNode->data + pNode->pos;
pCoder->size = len;
pCoder->pos = 0;
TD_SLIST_PUSH(&(pCoder->stack), pNode);
return 0;
}
void tEndDecode(SCoder* pCoder) {
struct SCoderNode* pNode;
ASSERT(pCoder->type == TD_DECODER);
ASSERT(tDecodeIsEnd(pCoder));
pNode = TD_SLIST_HEAD(&(pCoder->stack));
ASSERT(pNode);
TD_SLIST_POP(&(pCoder->stack));
pCoder->data = pNode->data;
pCoder->size = pNode->size;
pCoder->pos = pCoder->pos + pNode->pos;
free(pNode);
}
...@@ -41,4 +41,8 @@ target_sources(freelistTest ...@@ -41,4 +41,8 @@ target_sources(freelistTest
) )
target_link_libraries(freelistTest os util gtest gtest_main) target_link_libraries(freelistTest os util gtest gtest_main)
# encodeTest
add_executable(encodeTest "encodeTest.cpp")
target_link_libraries(encodeTest os util gtest gtest_main)
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册