diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index d46c32d73d823d52c739fc01d1d11c0a59f27168..718dfcf475136aab944c0b9c889a35d0f48c2bba 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -25,6 +25,7 @@ extern "C" { */ #include "os.h" #include "tbuffer.h" +#include "exception.h" #include "qextbuffer.h" #include "taosdef.h" #include "tscSecondaryMerge.h" @@ -177,7 +178,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid); -void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf); +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 6e16606695430c7d66bb23733a80dcf94248cea6..5590ac5a01aee5f8b49b1a35a543efc063417ed1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1185,10 +1185,18 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause"); } - SBuffer buf = exprTreeToBinary(pNode); + SBufferWriter bw = tbufInitWriter(NULL, false); + + TRY(0) { + exprTreeToBinary(&bw, pNode); + } CATCH(code) { + tbufCloseWriter(&bw); + UNUSED(code); + // TODO: other error handling + } END_TRY - size_t len = tbufTell(&buf); - char* c = tbufGetData(&buf, true); + size_t len = tbufTell(&bw); + char* c = tbufGetData(&bw, true); // set the serialized binary string as the parameter of arithmetic expression addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, len, index.tableIndex); @@ -3751,7 +3759,15 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, SArray* colList = taosArrayInit(10, sizeof(SColIndex)); ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, colList); - SBuffer buf = exprTreeToBinary(p); + SBufferWriter bw = tbufInitWriter(NULL, false); + + TRY(0) { + exprTreeToBinary(&bw, p); + } CATCH(code) { + tbufCloseWriter(&bw); + UNUSED(code); + // TODO: more error handling + } END_TRY // add to source column list STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); @@ -3765,7 +3781,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, addRequiredTagColumn(pTableMetaInfo, &index); } - tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf); + tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &bw); doCompactQueryExpr(pExpr); tSQLExprDestroy(p1); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6b8b2b38b408ba05a7a1008dd99b04c3987a5c69..88ce13e560a1e2ecdab9566222f282d978b9801f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -47,18 +47,18 @@ SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { return NULL; } -void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf) { - if (tbufTell(pBuf) == 0) { +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) { + if (tbufTell(bw) == 0) { return; } SCond cond = { .uid = uid, - .len = tbufTell(pBuf), + .len = tbufTell(bw), .cond = NULL, }; - cond.cond = tbufGetData(pBuf, true); + cond.cond = tbufGetData(bw, true); if (pTagCond->pCond == NULL) { pTagCond->pCond = taosArrayInit(3, sizeof(SCond)); diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 903d54a18f7370b4a3d896a5c071eb9618cb78c4..6c997d5a36dd3ba379f0d490c7178c02b2340df0 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -90,9 +90,10 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res); uint8_t getBinaryExprOptr(SSQLToken *pToken); -SBuffer exprTreeToBinary(tExprNode* pExprTree); +void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); +void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); -tExprNode* exprTreeFromBinary(const void* pBuf, size_t size); +tExprNode* exprTreeFromBinary(const void* data, size_t size); tExprNode* exprTreeFromTableName(const char* tbnameCond); #ifdef __cplusplus diff --git a/src/query/src/qast.c b/src/query/src/qast.c index fdcbeeeac0315ba29466a9c1d8a81969d4713a31..500a5f1e496c5ac30c3f3d6011098dca0c840be6 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -31,6 +31,7 @@ #include "tskiplist.h" #include "queryLog.h" #include "tsdbMain.h" +#include "exception.h" /* * @@ -44,7 +45,6 @@ * */ static tExprNode *tExprNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken); -static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i); static void destroySyntaxTree(tExprNode *); @@ -428,7 +428,7 @@ void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len) { static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); } -static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { +void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { if (pNode == NULL) { return; } @@ -1023,104 +1023,116 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) { } } -static void exprTreeToBinaryImpl(tExprNode* pExprTree, SBuffer* pBuf) { - tbufWrite(pBuf, &pExprTree->nodeType, sizeof(pExprTree->nodeType)); +static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) { + tbufWriteUint8(bw, expr->nodeType); - if (pExprTree->nodeType == TSQL_NODE_VALUE) { - tVariant* pVal = pExprTree->pVal; + if (expr->nodeType == TSQL_NODE_VALUE) { + tVariant* pVal = expr->pVal; - tbufWrite(pBuf, &pVal->nType, sizeof(pVal->nType)); + tbufWriteUint32(bw, pVal->nType); if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufWrite(pBuf, &pVal->nLen, sizeof(pVal->nLen)); - tbufWrite(pBuf, pVal->pz, pVal->nLen); + tbufWriteInt32(bw, pVal->nLen); + tbufWrite(bw, pVal->pz, pVal->nLen); } else { - tbufWrite(pBuf, &pVal->pz, sizeof(pVal->i64Key)); + tbufWriteInt64(bw, pVal->i64Key); } - } else if (pExprTree->nodeType == TSQL_NODE_COL) { - SSchema* pSchema = pExprTree->pSchema; - tbufWrite(pBuf, &pSchema->colId, sizeof(pSchema->colId)); - tbufWrite(pBuf, &pSchema->bytes, sizeof(pSchema->bytes)); - tbufWrite(pBuf, &pSchema->type, sizeof(pSchema->type)); + } else if (expr->nodeType == TSQL_NODE_COL) { + SSchema* pSchema = expr->pSchema; + tbufWriteInt16(bw, pSchema->colId); + tbufWriteInt16(bw, pSchema->bytes); + tbufWriteUint8(bw, pSchema->type); + tbufWriteString(bw, pSchema->name); - int32_t len = strlen(pSchema->name); - tbufWriteStringLen(pBuf, pSchema->name, len); - - } else if (pExprTree->nodeType == TSQL_NODE_EXPR) { - tbufWrite(pBuf, &pExprTree->_node.optr, sizeof(pExprTree->_node.optr)); - tbufWrite(pBuf, &pExprTree->_node.hasPK, sizeof(pExprTree->_node.hasPK)); - - exprTreeToBinaryImpl(pExprTree->_node.pLeft, pBuf); - exprTreeToBinaryImpl(pExprTree->_node.pRight, pBuf); + } else if (expr->nodeType == TSQL_NODE_EXPR) { + tbufWriteUint8(bw, expr->_node.optr); + tbufWriteUint8(bw, expr->_node.hasPK); + exprTreeToBinaryImpl(bw, expr->_node.pLeft); + exprTreeToBinaryImpl(bw, expr->_node.pRight); } } -SBuffer exprTreeToBinary(tExprNode* pExprTree) { - SBuffer buf = {0}; - if (pExprTree == NULL) { - return buf; +void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) { + if (expr != NULL) { + exprTreeToBinaryImpl(bw, expr); } - - int32_t code = tbufBeginWrite(&buf); - if (code != 0) { - return buf; +} + +// TODO: these three functions should be made global +static void* exception_calloc(size_t nmemb, size_t size) { + void* p = calloc(nmemb, size); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); } - - exprTreeToBinaryImpl(pExprTree, &buf); - return buf; + return p; +} + +static void* exception_malloc(size_t size) { + void* p = malloc(size); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); + } + return p; } -static tExprNode* exprTreeFromBinaryImpl(SBuffer* pBuf) { - tExprNode* pExpr = calloc(1, sizeof(tExprNode)); - pExpr->nodeType = tbufReadUint8(pBuf); +static char* exception_strdup(const char* str) { + char* p = strdup(str); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); + } + return p; +} + + +static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) { + int32_t anchor = CLEANUP_GET_ANCHOR(); + + tExprNode* pExpr = exception_calloc(1, sizeof(tExprNode)); + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, pExpr, NULL); + + pExpr->nodeType = tbufReadUint8(br); if (pExpr->nodeType == TSQL_NODE_VALUE) { - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); pExpr->pVal = pVal; - pVal->nType = tbufReadUint32(pBuf); + pVal->nType = tbufReadUint32(br); if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufReadToBuffer(pBuf, &pVal->nLen, sizeof(pVal->nLen)); + tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen)); pVal->pz = calloc(1, pVal->nLen + 1); - tbufReadToBuffer(pBuf, pVal->pz, pVal->nLen); + tbufReadToBuffer(br, pVal->pz, pVal->nLen); } else { - pVal->i64Key = tbufReadInt64(pBuf); + pVal->i64Key = tbufReadInt64(br); } } else if (pExpr->nodeType == TSQL_NODE_COL) { - SSchema* pSchema = calloc(1, sizeof(SSchema)); - if (pSchema == NULL) { - // TODO: - } + SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); pExpr->pSchema = pSchema; - pSchema->colId = tbufReadInt16(pBuf); - pSchema->bytes = tbufReadInt16(pBuf); - pSchema->type = tbufReadUint8(pBuf); - tbufReadToString(pBuf, pSchema->name, TSDB_COL_NAME_LEN); + pSchema->colId = tbufReadInt16(br); + pSchema->bytes = tbufReadInt16(br); + pSchema->type = tbufReadUint8(br); + tbufReadToString(br, pSchema->name, TSDB_COL_NAME_LEN); } else if (pExpr->nodeType == TSQL_NODE_EXPR) { - pExpr->_node.optr = tbufReadUint8(pBuf); - pExpr->_node.hasPK = tbufReadUint8(pBuf); - pExpr->_node.pLeft = exprTreeFromBinaryImpl(pBuf); - pExpr->_node.pRight = exprTreeFromBinaryImpl(pBuf); + pExpr->_node.optr = tbufReadUint8(br); + pExpr->_node.hasPK = tbufReadUint8(br); + pExpr->_node.pLeft = exprTreeFromBinaryImpl(br); + pExpr->_node.pRight = exprTreeFromBinaryImpl(br); assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL); } + CLEANUP_EXECUTE_TO(anchor, false); return pExpr; } -tExprNode* exprTreeFromBinary(const void* pBuf, size_t size) { +tExprNode* exprTreeFromBinary(const void* data, size_t size) { if (size == 0) { return NULL; } - SBuffer rbuf = {0}; - tbufBeginRead(&rbuf, pBuf, size); - return exprTreeFromBinaryImpl(&rbuf); + SBufferReader br = tbufInitReader(data, size, false); + return exprTreeFromBinaryImpl(&br); } tExprNode* exprTreeFromTableName(const char* tbnameCond) { @@ -1128,23 +1140,18 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { return NULL; } - tExprNode* expr = calloc(1, sizeof(tExprNode)); - if (expr == NULL) { - // TODO: - } + int32_t anchor = CLEANUP_GET_ANCHOR(); + + tExprNode* expr = exception_calloc(1, sizeof(tExprNode)); + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL); + expr->nodeType = TSQL_NODE_EXPR; - tExprNode* left = calloc(1, sizeof(tExprNode)); - if (left == NULL) { - // TODO: - } + tExprNode* left = exception_calloc(1, sizeof(tExprNode)); expr->_node.pLeft = left; left->nodeType = TSQL_NODE_COL; - SSchema* pSchema = calloc(1, sizeof(SSchema)); - if (pSchema == NULL) { - // TODO: - } + SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); left->pSchema = pSchema; pSchema->type = TSDB_DATA_TYPE_BINARY; @@ -1152,36 +1159,24 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { strcpy(pSchema->name, TSQL_TBNAME_L); pSchema->colId = -1; - tExprNode* right = calloc(1, sizeof(tExprNode)); - if (right == NULL) { - // TODO - } + tExprNode* right = exception_calloc(1, sizeof(tExprNode)); expr->_node.pRight = right; if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) { right->nodeType = TSQL_NODE_VALUE; expr->_node.optr = TSDB_RELATION_LIKE; - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); right->pVal = pVal; - pVal->nType = TSDB_DATA_TYPE_BINARY; size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1; - pVal->pz = malloc(len); - if (pVal->pz == NULL) { - // TODO: - } + pVal->pz = exception_malloc(len); memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len); + pVal->nType = TSDB_DATA_TYPE_BINARY; pVal->nLen = (int32_t)len; } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) { right->nodeType = TSQL_NODE_VALUE; expr->_node.optr = TSDB_RELATION_IN; - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); right->pVal = pVal; pVal->nType = TSDB_DATA_TYPE_ARRAY; pVal->arr = taosArrayInit(2, sizeof(char*)); @@ -1192,7 +1187,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { cond = e + 1; } else if (*e == ',') { size_t len = e - cond + 1; - char* p = malloc( len ); + char* p = exception_malloc( len ); memcpy(p, cond, len); p[len - 1] = 0; cond += len; @@ -1201,12 +1196,13 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { } if (*cond != 0) { - char* p = strdup( cond ); + char* p = exception_strdup( cond ); taosArrayPush(pVal->arr, &p); } taosArraySortString(pVal->arr); } + CLEANUP_EXECUTE_TO(anchor, false); return expr; } \ No newline at end of file diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp index 6a78cfbe539e96a80d3fb26048552055e7561b23..dee85ef63002b91e08cf32d7f1eb6417c1afac7c 100644 --- a/src/query/tests/astTest.cpp +++ b/src/query/tests/astTest.cpp @@ -550,11 +550,12 @@ tExprNode* createExpr2() { void exprSerializeTest1() { tExprNode* p1 = createExpr1(); - SBuffer buf = exprTreeToBinary(p1); + SBufferWriter bw = tbufInitWriter(NULL, false); + exprTreeToBinary(&bw, p1); - size_t size = tbufTell(&buf); + size_t size = tbufTell(&bw); ASSERT_TRUE(size > 0); - char* b = tbufGetData(&buf, false); + char* b = tbufGetData(&bw, false); tExprNode* p2 = exprTreeFromBinary(b, size); ASSERT_EQ(p1->nodeType, p2->nodeType); @@ -581,16 +582,17 @@ void exprSerializeTest1() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&buf, false); + tbufClose(&bw); } void exprSerializeTest2() { tExprNode* p1 = createExpr2(); - SBuffer buf = exprTreeToBinary(p1); + SBufferWriter bw = tbufInitWriter(NULL, false); + exprTreeToBinary(&bw, p1); - size_t size = tbufTell(&buf); + size_t size = tbufTell(&bw); ASSERT_TRUE(size > 0); - char* b = tbufGetData(&buf, false); + char* b = tbufGetData(&bw, false); tExprNode* p2 = exprTreeFromBinary(b, size); ASSERT_EQ(p1->nodeType, p2->nodeType); @@ -625,7 +627,7 @@ void exprSerializeTest2() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&buf, false); + tbufClose(&bw); } } // namespace TEST(testCase, astTest) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index eb35be5383e1e82761a47de6fcd54c223687234b..bc9220dbc72b197785b5fa227340def9e1839dbb 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -18,6 +18,7 @@ #include "talgo.h" #include "tutil.h" #include "tcompare.h" +#include "exception.h" #include "../../../query/inc/qast.h" // todo move to common module #include "../../../query/inc/tlosertree.h" // todo move to util module @@ -1473,21 +1474,35 @@ int32_t tsdbQueryByTagsCond( } int32_t ret = TSDB_CODE_SUCCESS; + tExprNode* expr = NULL; - tExprNode* expr = exprTreeFromTableName(tbnameCond); - tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); - if (tagExpr != NULL) { + TRY(32) { + expr = exprTreeFromTableName(tbnameCond); if (expr == NULL) { - expr = tagExpr; + expr = exprTreeFromBinary(pTagCond, len); } else { - tExprNode* tbnameExpr = expr; - expr = calloc(1, sizeof(tExprNode)); - expr->nodeType = TSQL_NODE_EXPR; - expr->_node.optr = tagNameRelType; - expr->_node.pLeft = tagExpr; - expr->_node.pRight = tbnameExpr; + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL); + tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); + if (tagExpr != NULL) { + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL); + tExprNode* tbnameExpr = expr; + expr = calloc(1, sizeof(tExprNode)); + if (expr == NULL) { + THROW( TSDB_CODE_SERV_OUT_OF_MEMORY ); + } + expr->nodeType = TSQL_NODE_EXPR; + expr->_node.optr = tagNameRelType; + expr->_node.pLeft = tagExpr; + expr->_node.pRight = tbnameExpr; + } } - } + CLEANUP_EXECUTE(); + + } CATCH( code ) { + CLEANUP_EXECUTE(); + ret = code; + // TODO: more error handling + } END_TRY doQueryTableList(pSTable, res, expr); pGroupInfo->numOfTables = taosArrayGetSize(res); diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h new file mode 100644 index 0000000000000000000000000000000000000000..41f01d68dd5fddd800c20b5d23c6b20fa1bb7b73 --- /dev/null +++ b/src/util/inc/exception.h @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_EXCEPTION_H +#define TDENGINE_EXCEPTION_H + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * cleanup actions + */ +typedef struct SCleanupAction { + bool failOnly; + uint8_t wrapper; + uint16_t reserved; + void* func; + union { + void* Ptr; + bool Bool; + char Char; + int8_t Int8; + uint8_t Uint8; + int16_t Int16; + uint16_t Uint16; + int Int; + unsigned int Uint; + int32_t Int32; + uint32_t Uint32; + int64_t Int64; + uint64_t Uint64; + float Float; + double Double; + } arg1, arg2; +} SCleanupAction; + + +/* + * exception hander registration + */ +typedef struct SExceptionNode { + struct SExceptionNode* prev; + jmp_buf jb; + int32_t code; + int32_t maxCleanupAction; + int32_t numCleanupAction; + SCleanupAction* cleanupActions; +} SExceptionNode; + +//////////////////////////////////////////////////////////////////////////////// +// functions & macros for auto-cleanup + +void cleanupPush_void_ptr_ptr ( bool failOnly, void* func, void* arg1, void* arg2 ); +void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool arg2 ); +void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); +void cleanupPush_int_int ( bool failOnly, void* func, int arg ); +void cleanupPush_void ( bool failOnly, void* func ); + +int32_t cleanupGetActionCount(); +void cleanupExecuteTo( int32_t anchor, bool failed ); +void cleanupExecute( SExceptionNode* node, bool failed ); + +#define CLEANUP_PUSH_VOID_PTR_PTR( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_ptr( (failOnly), (void*)(func), (void*)(arg1), (void*)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR_BOOL( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_bool( (failOnly), (void*)(func), (void*)(arg1), (bool)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) +#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) +#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) +#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) +#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) + +#define CLEANUP_GET_ANCHOR() cleanupGetActionCount() +#define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) ) + + +//////////////////////////////////////////////////////////////////////////////// +// functions & macros for exception handling + +void exceptionPushNode( SExceptionNode* node ); +int32_t exceptionPopNode(); +void exceptionThrow( int code ); + +#define TRY(maxCleanupActions) do { \ + SExceptionNode exceptionNode = { 0 }; \ + SCleanupAction cleanupActions[(maxCleanupActions) > 0 ? (maxCleanupActions) : 1]; \ + exceptionNode.maxCleanupAction = (maxCleanupActions) > 0 ? (maxCleanupActions) : 1; \ + exceptionNode.cleanupActions = cleanupActions; \ + exceptionPushNode( &exceptionNode ); \ + int caughtException = setjmp( exceptionNode.jb ); \ + if( caughtException == 0 ) + +#define CATCH( code ) int code = exceptionPopNode(); \ + if( caughtException == 1 ) + +#define FINALLY( code ) int code = exceptionPopNode(); + +#define END_TRY } while( 0 ); + +#define THROW( x ) exceptionThrow( (x) ) +#define CAUGHT_EXCEPTION() ((bool)(caughtException == 1)) +#define CLEANUP_EXECUTE() cleanupExecute( &exceptionNode, CAUGHT_EXCEPTION() ) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 9bc0fd9f43eb9122aa50a438a3eb1053312c7d6e..e2bdb815d7ac8ecd09b62a3a62897b78ebf3c4b7 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -16,122 +16,163 @@ #ifndef TDENGINE_TBUFFER_H #define TDENGINE_TBUFFER_H -#include "setjmp.h" -#include "os.h" +#include +#include #ifdef __cplusplus extern "C" { #endif +//////////////////////////////////////////////////////////////////////////////// +// usage example /* -SBuffer can be used to read or write a buffer, but cannot be used for both -read & write at a same time. Below is an example: - -int main(int argc, char** argv) { - //--------------------- write ------------------------ - SBuffer wbuf; - int32_t code = tbufBeginWrite(&wbuf); - if (code != 0) { - // handle errors - return 0; - } - - // reserve 1024 bytes for the buffer to improve performance - tbufEnsureCapacity(&wbuf, 1024); - - // write 5 integers to the buffer - for (int i = 0; i < 5; i++) { - tbufWriteInt32(&wbuf, i); - } - - // write a string to the buffer - tbufWriteString(&wbuf, "this is a string.\n"); - - // acquire the result and close the write buffer - size_t size = tbufTell(&wbuf); - char* data = tbufGetData(&wbuf, true); - tbufClose(&wbuf, true); - - - //------------------------ read ----------------------- - SBuffer rbuf; - code = tbufBeginRead(&rbuf, data, size); - if (code != 0) { - printf("you will see this message after print out 5 integers and a string.\n"); - tbufClose(&rbuf, false); - return 0; - } - - // read & print out 5 integers - for (int i = 0; i < 5; i++) { - printf("%d\n", tbufReadInt32(&rbuf)); - } - - // read & print out a string - printf(tbufReadString(&rbuf, NULL)); - - // try read another integer, this result in an error as there no this integer - tbufReadInt32(&rbuf); - - printf("you should not see this message.\n"); - tbufClose(&rbuf, false); - +#include +#include "exception.h" + +int main( int argc, char** argv ) { + SBufferWriter bw = tbufInitWriter( NULL, false ); + + TRY( 1 ) { + //--------------------- write ------------------------ + // reserve 1024 bytes for the buffer to improve performance + tbufEnsureCapacity( &bw, 1024 ); + + // reserve space for the interger count + size_t pos = tbufReserve( &bw, sizeof(int32_t) ); + // write 5 integers to the buffer + for( int i = 0; i < 5; i++) { + tbufWriteInt32( &bw, i ); + } + // write the integer count to buffer at reserved position + tbufWriteInt32At( &bw, pos, 5 ); + + // write a string to the buffer + tbufWriteString( &bw, "this is a string.\n" ); + // acquire the result and close the write buffer + size_t size = tbufTell( &bw ); + char* data = tbufGetData( &bw, false ); + + //------------------------ read ----------------------- + SBufferReader br = tbufInitReader( data, size, false ); + // read & print out all integers + int32_t count = tbufReadInt32( &br ); + for( int i = 0; i < count; i++ ) { + printf( "%d\n", tbufReadInt32(&br) ); + } + // read & print out a string + puts( tbufReadString(&br, NULL) ); + // try read another integer, this result in an error as there no this integer + tbufReadInt32( &br ); + printf( "you should not see this message.\n" ); + } CATCH( code ) { + printf( "exception code is: %d, you will see this message after print out 5 integers and a string.\n", code ); + } END_TRY + + tbufCloseWriter( &bw ); return 0; } */ + typedef struct { - jmp_buf jb; - char* data; - size_t pos; - size_t size; -} SBuffer; - -// common functions can be used in both read & write -#define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) -size_t tbufTell(SBuffer* buf); -size_t tbufSeekTo(SBuffer* buf, size_t pos); -size_t tbufSkip(SBuffer* buf, size_t size); -void tbufClose(SBuffer* buf, bool keepData); - -// basic read functions -#define tbufBeginRead(buf, _data, len) ((buf)->data = (char*)(_data), ((buf)->pos = 0), ((buf)->size = ((_data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) -char* tbufRead(SBuffer* buf, size_t size); -void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); -const char* tbufReadString(SBuffer* buf, size_t* len); -size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); - -// basic write functions -#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) -void tbufEnsureCapacity(SBuffer* buf, size_t size); -char* tbufGetData(SBuffer* buf, bool takeOver); -void tbufWrite(SBuffer* buf, const void* data, size_t size); -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); -void tbufWriteString(SBuffer* buf, const char* str); - -// read & write function for primitive types -#ifndef TBUFFER_DEFINE_FUNCTION -#define TBUFFER_DEFINE_FUNCTION(type, name) \ - type tbufRead##name(SBuffer* buf); \ - void tbufWrite##name(SBuffer* buf, type data); \ - void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); -#endif + bool endian; + const char* data; + size_t pos; + size_t size; +} SBufferReader; -TBUFFER_DEFINE_FUNCTION(bool, Bool) -TBUFFER_DEFINE_FUNCTION(char, Char) -TBUFFER_DEFINE_FUNCTION(int8_t, Int8) -TBUFFER_DEFINE_FUNCTION(uint8_t, Uint8) -TBUFFER_DEFINE_FUNCTION(int16_t, Int16) -TBUFFER_DEFINE_FUNCTION(uint16_t, Uint16) -TBUFFER_DEFINE_FUNCTION(int32_t, Int32) -TBUFFER_DEFINE_FUNCTION(uint32_t, Uint32) -TBUFFER_DEFINE_FUNCTION(int64_t, Int64) -TBUFFER_DEFINE_FUNCTION(uint64_t, Uint64) -TBUFFER_DEFINE_FUNCTION(float, Float) -TBUFFER_DEFINE_FUNCTION(double, Double) +typedef struct { + bool endian; + char* data; + size_t pos; + size_t size; + void* (*allocator)( void*, size_t ); +} SBufferWriter; + +//////////////////////////////////////////////////////////////////////////////// +// common functions & macros for both reader & writer + +#define tbufTell( buf ) ((buf)->pos) + + +//////////////////////////////////////////////////////////////////////////////// +// reader functions & macros + +// *Endian*, if true, reader functions of primitive types will do 'ntoh' automatically +#define tbufInitReader( Data, Size, Endian ) {.endian = (Endian), .data = (Data), .pos = 0, .size = ((Data) == NULL ? 0 :(Size))} + +size_t tbufSkip( SBufferReader* buf, size_t size ); + +const char* tbufRead( SBufferReader* buf, size_t size ); +void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ); +const char* tbufReadString( SBufferReader* buf, size_t* len ); +size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ); +const char* tbufReadBinary( SBufferReader* buf, size_t *len ); +size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size ); + +bool tbufReadBool( SBufferReader* buf ); +char tbufReadChar( SBufferReader* buf ); +int8_t tbufReadInt8( SBufferReader* buf ); +uint8_t tbufReadUint8( SBufferReader* buf ); +int16_t tbufReadInt16( SBufferReader* buf ); +uint16_t tbufReadUint16( SBufferReader* buf ); +int32_t tbufReadInt32( SBufferReader* buf ); +uint32_t tbufReadUint32( SBufferReader* buf ); +int64_t tbufReadInt64( SBufferReader* buf ); +uint64_t tbufReadUint64( SBufferReader* buf ); +float tbufReadFloat( SBufferReader* buf ); +double tbufReadDouble( SBufferReader* buf ); + + +//////////////////////////////////////////////////////////////////////////////// +// writer functions & macros + +// *Allocator*, function to allocate memory, will use 'realloc' if NULL +// *Endian*, if true, writer functions of primitive types will do 'hton' automatically +#define tbufInitWriter( Allocator, Endian ) {.endian = (Endian), .data = NULL, .pos = 0, .size = 0, .allocator = ((Allocator) == NULL ? realloc : (Allocator))} +void tbufCloseWriter( SBufferWriter* buf ); + +void tbufEnsureCapacity( SBufferWriter* buf, size_t size ); +size_t tbufReserve( SBufferWriter* buf, size_t size ); +char* tbufGetData( SBufferWriter* buf, bool takeOver ); + +void tbufWrite( SBufferWriter* buf, const void* data, size_t size ); +void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ); +void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ); +void tbufWriteString( SBufferWriter* buf, const char* str ); +// the prototype of tbufWriteBinary and tbufWrite are identical +// the difference is: tbufWriteBinary writes the length of the data to the buffer +// first, then the actual data, which means the reader don't need to know data +// size before read. Write only write the data itself, which means the reader +// need to know data size before read. +void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ); + +void tbufWriteBool( SBufferWriter* buf, bool data ); +void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ); +void tbufWriteChar( SBufferWriter* buf, char data ); +void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ); +void tbufWriteInt8( SBufferWriter* buf, int8_t data ); +void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ); +void tbufWriteUint8( SBufferWriter* buf, uint8_t data ); +void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ); +void tbufWriteInt16( SBufferWriter* buf, int16_t data ); +void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ); +void tbufWriteUint16( SBufferWriter* buf, uint16_t data ); +void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ); +void tbufWriteInt32( SBufferWriter* buf, int32_t data ); +void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ); +void tbufWriteUint32( SBufferWriter* buf, uint32_t data ); +void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ); +void tbufWriteInt64( SBufferWriter* buf, int64_t data ); +void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ); +void tbufWriteUint64( SBufferWriter* buf, uint64_t data ); +void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ); +void tbufWriteFloat( SBufferWriter* buf, float data ); +void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ); +void tbufWriteDouble( SBufferWriter* buf, double data ); +void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ); #ifdef __cplusplus } #endif -#endif \ No newline at end of file +#endif diff --git a/src/util/src/exception.c b/src/util/src/exception.c new file mode 100644 index 0000000000000000000000000000000000000000..7f8f91c784b7a07078a2801a373d608693178db7 --- /dev/null +++ b/src/util/src/exception.c @@ -0,0 +1,132 @@ +#include "exception.h" + + +static _Thread_local SExceptionNode* expList; + +void exceptionPushNode( SExceptionNode* node ) { + node->prev = expList; + expList = node; +} + +int32_t exceptionPopNode() { + SExceptionNode* node = expList; + expList = node->prev; + return node->code; +} + +void exceptionThrow( int code ) { + expList->code = code; + longjmp( expList->jb, 1 ); +} + + + +static void cleanupWrapper_void_ptr_ptr( SCleanupAction* ca ) { + void (*func)( void*, void* ) = ca->func; + func( ca->arg1.Ptr, ca->arg2.Ptr ); +} + +static void cleanupWrapper_void_ptr_bool( SCleanupAction* ca ) { + void (*func)( void*, bool ) = ca->func; + func( ca->arg1.Ptr, ca->arg2.Bool ); +} + +static void cleanupWrapper_void_ptr( SCleanupAction* ca ) { + void (*func)( void* ) = ca->func; + func( ca->arg1.Ptr ); +} + +static void cleanupWrapper_int_int( SCleanupAction* ca ) { + int (*func)( int ) = ca->func; + func( (int)(intptr_t)(ca->arg1.Int) ); +} + +static void cleanupWrapper_void_void( SCleanupAction* ca ) { + void (*func)() = ca->func; + func(); +} + +typedef void (*wrapper)(SCleanupAction*); +static wrapper wrappers[] = { + cleanupWrapper_void_ptr_ptr, + cleanupWrapper_void_ptr_bool, + cleanupWrapper_void_ptr, + cleanupWrapper_int_int, + cleanupWrapper_void_void, +}; + + +void cleanupPush_void_ptr_ptr( bool failOnly, void* func, void* arg1, void* arg2 ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 0; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg1; + ca->arg2.Ptr = arg2; +} + +void cleanupPush_void_ptr_bool( bool failOnly, void* func, void* arg1, bool arg2 ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 1; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg1; + ca->arg2.Bool = arg2; +} + +void cleanupPush_void_ptr( bool failOnly, void* func, void* arg ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 2; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg; +} + +void cleanupPush_int_int( bool failOnly, void* func, int arg ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 3; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Int = arg; +} + +void cleanupPush_void( bool failOnly, void* func ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 4; + ca->failOnly = failOnly; + ca->func = func; +} + + + +int32_t cleanupGetActionCount() { + return expList->numCleanupAction; +} + + +static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed ) { + while( node->numCleanupAction > anchor ) { + --node->numCleanupAction; + SCleanupAction *ca = node->cleanupActions + node->numCleanupAction; + if( failed || !(ca->failOnly) ) + wrappers[ca->wrapper]( ca ); + } +} + +void cleanupExecuteTo( int32_t anchor, bool failed ) { + doExecuteCleanup( expList, anchor, failed ); +} + +void cleanupExecute( SExceptionNode* node, bool failed ) { + doExecuteCleanup( node, 0, failed ); +} \ No newline at end of file diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index a83d7dddb0d8e987bfbb670f8f3e4b413539dec2..3b4cc74cc3717c365b3a1e0e36b63d5b4098e995 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -16,150 +16,384 @@ #include #include #include +#include +#include "tbuffer.h" +#include "exception.h" +#include + +//////////////////////////////////////////////////////////////////////////////// +// reader functions -#define TBUFFER_DEFINE_FUNCTION(type, name) \ - type tbufRead##name(SBuffer* buf) { \ - type ret; \ - tbufReadToBuffer(buf, &ret, sizeof(type)); \ - return ret; \ - }\ - void tbufWrite##name(SBuffer* buf, type data) {\ - tbufWrite(buf, &data, sizeof(data));\ - }\ - void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\ - tbufWriteAt(buf, pos, &data, sizeof(data));\ +size_t tbufSkip(SBufferReader* buf, size_t size) { + if( (buf->pos + size) > buf->size ) { + THROW( TSDB_CODE_MEMORY_CORRUPTED ); } + size_t old = buf->pos; + buf->pos += size; + return old; +} -#include "tbuffer.h" +const char* tbufRead( SBufferReader* buf, size_t size ) { + const char* ret = buf->data + buf->pos; + tbufSkip( buf, size ); + return ret; +} +void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ) { + assert( dst != NULL ); + // always using memcpy, leave optimization to compiler + memcpy( dst, tbufRead(buf, size), size ); +} -//////////////////////////////////////////////////////////////////////////////// -// common functions +static size_t tbufReadLength( SBufferReader* buf ) { + // maximum length is 65535, if larger length is required + // this function and the corresponding write function need to be + // revised. + uint16_t l = tbufReadUint16( buf ); + return l; +} -size_t tbufTell(SBuffer* buf) { - return buf->pos; +const char* tbufReadString( SBufferReader* buf, size_t* len ) { + size_t l = tbufReadLength( buf ); + const char* ret = buf->data + buf->pos; + tbufSkip( buf, l + 1 ); + if( ret[l] != 0 ) { + THROW( TSDB_CODE_MEMORY_CORRUPTED ); + } + if( len != NULL ) { + *len = l; + } + return ret; } -size_t tbufSeekTo(SBuffer* buf, size_t pos) { - if (pos > buf->size) { - // TODO: update error code, other tbufThrowError need to be changed too - tbufThrowError(buf, 1); +size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ) { + assert( dst != NULL ); + size_t len; + const char* str = tbufReadString( buf, &len ); + if (len >= size) { + len = size - 1; } - size_t old = buf->pos; - buf->pos = pos; - return old; + memcpy( dst, str, len ); + dst[len] = 0; + return len; } -size_t tbufSkip(SBuffer* buf, size_t size) { - return tbufSeekTo(buf, buf->pos + size); +const char* tbufReadBinary( SBufferReader* buf, size_t *len ) { + size_t l = tbufReadLength( buf ); + const char* ret = buf->data + buf->pos; + tbufSkip( buf, l ); + if( len != NULL ) { + *len = l; + } + return ret; } -void tbufClose(SBuffer* buf, bool keepData) { - if (!keepData) { - free(buf->data); +size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size ) { + assert( dst != NULL ); + size_t len; + const char* data = tbufReadBinary( buf, &len ); + if( len >= size ) { + len = size; } - buf->data = NULL; - buf->pos = 0; - buf->size = 0; + memcpy( dst, data, len ); + return len; } -//////////////////////////////////////////////////////////////////////////////// -// read functions +bool tbufReadBool( SBufferReader* buf ) { + bool ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} -char* tbufRead(SBuffer* buf, size_t size) { - char* ret = buf->data + buf->pos; - tbufSkip(buf, size); +char tbufReadChar( SBufferReader* buf ) { + char ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); return ret; } -void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { - assert(dst != NULL); - // always using memcpy, leave optimization to compiler - memcpy(dst, tbufRead(buf, size), size); +int8_t tbufReadInt8( SBufferReader* buf ) { + int8_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; } -const char* tbufReadString(SBuffer* buf, size_t* len) { - uint16_t l = tbufReadUint16(buf); - char* ret = buf->data + buf->pos; - tbufSkip(buf, l + 1); - ret[l] = 0; // ensure the string end with '\0' - if (len != NULL) { - *len = l; +uint8_t tbufReadUint8( SBufferReader* buf ) { + uint8_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} + +int16_t tbufReadInt16( SBufferReader* buf ) { + int16_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int16_t)ntohs( ret ); } return ret; } -size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { - assert(dst != NULL); - size_t len; - const char* str = tbufReadString(buf, &len); - if (len >= size) { - len = size - 1; +uint16_t tbufReadUint16( SBufferReader* buf ) { + uint16_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return ntohs( ret ); } - memcpy(dst, str, len); - dst[len] = 0; - return len; + return ret; +} + +int32_t tbufReadInt32( SBufferReader* buf ) { + int32_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int32_t)ntohl( ret ); + } + return ret; +} + +uint32_t tbufReadUint32( SBufferReader* buf ) { + uint32_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return ntohl( ret ); + } + return ret; +} + +int64_t tbufReadInt64( SBufferReader* buf ) { + int64_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int64_t)htobe64( ret ); // TODO: ntohll + } + return ret; +} + +uint64_t tbufReadUint64( SBufferReader* buf ) { + uint64_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return htobe64( ret ); // TODO: ntohll + } + return ret; +} + +float tbufReadFloat( SBufferReader* buf ) { + uint32_t ret = tbufReadUint32( buf ); + return *(float*)( &ret ); } +double tbufReadDouble(SBufferReader* buf) { + uint64_t ret = tbufReadUint64( buf ); + return *(double*)( &ret ); +} //////////////////////////////////////////////////////////////////////////////// -// write functions +// writer functions -void tbufEnsureCapacity(SBuffer* buf, size_t size) { +void tbufCloseWriter( SBufferWriter* buf ) { + (*buf->allocator)( buf->data, 0 ); + buf->data = NULL; + buf->pos = 0; + buf->size = 0; +} + +void tbufEnsureCapacity( SBufferWriter* buf, size_t size ) { size += buf->pos; - if (size > buf->size) { + if( size > buf->size ) { size_t nsize = size + buf->size; - char* data = realloc(buf->data, nsize); - if (data == NULL) { - tbufThrowError(buf, 2); + char* data = (*buf->allocator)( buf->data, nsize ); + // TODO: the exception should be thrown by the allocator function + if( data == NULL ) { + THROW( TSDB_CODE_SERV_OUT_OF_MEMORY ); } buf->data = data; buf->size = nsize; } } -char* tbufGetData(SBuffer* buf, bool takeOver) { +size_t tbufReserve( SBufferWriter* buf, size_t size ) { + tbufEnsureCapacity( buf, size ); + size_t old = buf->pos; + buf->pos += size; + return old; +} + +char* tbufGetData( SBufferWriter* buf, bool takeOver ) { char* ret = buf->data; - if (takeOver) { + if( takeOver ) { buf->pos = 0; buf->size = 0; buf->data = NULL; } - return ret; } -void tbufEndWrite(SBuffer* buf) { - free(buf->data); - buf->data = NULL; - buf->pos = 0; - buf->size = 0; -} - -void tbufWrite(SBuffer* buf, const void* data, size_t size) { - assert(data != NULL); - tbufEnsureCapacity(buf, size); - memcpy(buf->data + buf->pos, data, size); +void tbufWrite( SBufferWriter* buf, const void* data, size_t size ) { + assert( data != NULL ); + tbufEnsureCapacity( buf, size ); + memcpy( buf->data + buf->pos, data, size ); buf->pos += size; } -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { - assert(data != NULL); +void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ) { + assert( data != NULL ); // this function can only be called to fill the gap on previous writes, // so 'pos + size <= buf->pos' must be true - assert(pos + size <= buf->pos); - memcpy(buf->data + pos, data, size); + assert( pos + size <= buf->pos ); + memcpy( buf->data + pos, data, size ); } -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { - // maximum string length is 65535, if longer string is required +static void tbufWriteLength( SBufferWriter* buf, size_t len ) { + // maximum length is 65535, if larger length is required // this function and the corresponding read function need to be // revised. - assert(len <= 0xffff); - tbufWriteUint16(buf, (uint16_t)len); - tbufWrite(buf, str, len + 1); + assert( len <= 0xffff ); + tbufWriteUint16( buf, (uint16_t)len ); +} + +void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ) { + tbufWriteLength( buf, len ); + tbufWrite( buf, str, len ); + tbufWriteChar( buf, '\0' ); +} + +void tbufWriteString( SBufferWriter* buf, const char* str ) { + tbufWriteStringLen( buf, str, strlen(str) ); +} + +void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ) { + tbufWriteLength( buf, len ); + tbufWrite( buf, data, len ); +} + +void tbufWriteBool( SBufferWriter* buf, bool data ) { + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteChar( SBufferWriter* buf, char data ) { + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteInt8( SBufferWriter* buf, int8_t data ) { + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteUint8( SBufferWriter* buf, uint8_t data ) { + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteInt16( SBufferWriter* buf, int16_t data ) { + if( buf->endian ) { + data = (int16_t)htons( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ) { + if( buf->endian ) { + data = (int16_t)htons( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteUint16( SBufferWriter* buf, uint16_t data ) { + if( buf->endian ) { + data = htons( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ) { + if( buf->endian ) { + data = htons( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteInt32( SBufferWriter* buf, int32_t data ) { + if( buf->endian ) { + data = (int32_t)htonl( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ) { + if( buf->endian ) { + data = (int32_t)htonl( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteUint32( SBufferWriter* buf, uint32_t data ) { + if( buf->endian ) { + data = htonl( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ) { + if( buf->endian ) { + data = htonl( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteInt64( SBufferWriter* buf, int64_t data ) { + if( buf->endian ) { + data = (int64_t)htobe64( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ) { + if( buf->endian ) { + data = (int64_t)htobe64( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteUint64( SBufferWriter* buf, uint64_t data ) { + if( buf->endian ) { + data = htobe64( data ); + } + tbufWrite( buf, &data, sizeof(data) ); +} + +void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ) { + if( buf->endian ) { + data = htobe64( data ); + } + tbufWriteAt( buf, pos, &data, sizeof(data) ); +} + +void tbufWriteFloat( SBufferWriter* buf, float data ) { + tbufWriteUint32( buf, *(uint32_t*)(&data) ); +} + +void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ) { + tbufWriteUint32At( buf, pos, *(uint32_t*)(&data) ); +} + +void tbufWriteDouble( SBufferWriter* buf, double data ) { + tbufWriteUint64( buf, *(uint64_t*)(&data) ); } -void tbufWriteString(SBuffer* buf, const char* str) { - tbufWriteStringLen(buf, str, strlen(str)); +void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ) { + tbufWriteUint64At( buf, pos, *(uint64_t*)(&data) ); }