提交 0bf399a6 编写于 作者: X Xiaoyu Wang

merge 3.0

......@@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT})
endif(${BUILD_WITH_TRAFT})
add_subdirectory(tdev)
add_subdirectory(lz4)
add_executable(lz4_test "")
target_sources(lz4_test
PRIVATE
"main.c"
)
target_link_libraries(lz4_test lz4_static)
\ No newline at end of file
#include <stdio.h>
#include "lz4.h"
int main(int argc, char const *argv[]) {
printf("%d\n", LZ4_compressBound(1024));
return 0;
}
Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
......@@ -73,15 +73,17 @@ typedef struct {
uint64_t suid;
} STableListInfo;
#pragma pack(push, 1)
typedef struct SColumnDataAgg {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t maxIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SColumnDataAgg;
#pragma pack(pop)
typedef struct SDataBlockInfo {
STimeWindow window;
......@@ -122,13 +124,11 @@ typedef struct SColumnInfoData {
} SColumnInfoData;
typedef struct SQueryTableDataCond {
// STimeWindow twindow;
uint64_t suid;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow* twindows;
int64_t startVersion;
......
......@@ -34,21 +34,40 @@ typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct SColData SColData;
typedef struct STagVal STagVal;
typedef struct STag STag;
// bitmap
#define N1(n) ((1 << (n)) - 1)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) \
do { \
(p)[(i) / 8] &= N1((i) % 8); \
(p)[(i) / 8] |= (((uint8_t)(v)) << (((i) % 8))); \
} while (0)
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define SET_BIT2(p, i, v) \
do { \
p[(i) / 4] &= N1((i) % 4 * 2); \
(p)[(i) / 4] |= (((uint8_t)(v)) << (((i) % 4) * 2)); \
} while (0)
#define GET_BIT2(p, i) (((p)[(i) / 4] >> (((i) % 4) * 2)) & ((uint8_t)3))
// STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SValue
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
// STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
......@@ -140,6 +159,7 @@ struct SValue {
struct SColVal {
int16_t cid;
int8_t type;
int8_t isNone;
int8_t isNull;
SValue value;
......@@ -172,12 +192,6 @@ struct STag {
};
#pragma pack(pop)
struct SColData {
int16_t cid;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
......
......@@ -3024,6 +3024,17 @@ typedef struct {
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq);
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq);
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
int64_t affectedRows;
} SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -200,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
......
......@@ -299,6 +299,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp
int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int8_t colType, int32_t offset,
int16_t colIdx);
int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset, int16_t colIdx);
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal);
typedef struct {
STSchema *pSchema;
......@@ -312,6 +313,7 @@ typedef struct {
void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow);
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema);
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow);
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
......@@ -320,7 +322,7 @@ STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema
int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode);
bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx,
SCellVal *pVal);
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal);
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode);
void tdSCellValPrint(SCellVal *pVal, int8_t colType);
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag);
......
......@@ -73,201 +73,202 @@
#define TK_MNODE 55
#define TK_DATABASE 56
#define TK_USE 57
#define TK_IF 58
#define TK_NOT 59
#define TK_EXISTS 60
#define TK_BUFFER 61
#define TK_CACHELAST 62
#define TK_CACHELASTSIZE 63
#define TK_COMP 64
#define TK_DURATION 65
#define TK_NK_VARIABLE 66
#define TK_FSYNC 67
#define TK_MAXROWS 68
#define TK_MINROWS 69
#define TK_KEEP 70
#define TK_PAGES 71
#define TK_PAGESIZE 72
#define TK_PRECISION 73
#define TK_REPLICA 74
#define TK_STRICT 75
#define TK_WAL 76
#define TK_VGROUPS 77
#define TK_SINGLE_STABLE 78
#define TK_RETENTIONS 79
#define TK_SCHEMALESS 80
#define TK_NK_COLON 81
#define TK_TABLE 82
#define TK_NK_LP 83
#define TK_NK_RP 84
#define TK_STABLE 85
#define TK_ADD 86
#define TK_COLUMN 87
#define TK_MODIFY 88
#define TK_RENAME 89
#define TK_TAG 90
#define TK_SET 91
#define TK_NK_EQ 92
#define TK_USING 93
#define TK_TAGS 94
#define TK_COMMENT 95
#define TK_BOOL 96
#define TK_TINYINT 97
#define TK_SMALLINT 98
#define TK_INT 99
#define TK_INTEGER 100
#define TK_BIGINT 101
#define TK_FLOAT 102
#define TK_DOUBLE 103
#define TK_BINARY 104
#define TK_TIMESTAMP 105
#define TK_NCHAR 106
#define TK_UNSIGNED 107
#define TK_JSON 108
#define TK_VARCHAR 109
#define TK_MEDIUMBLOB 110
#define TK_BLOB 111
#define TK_VARBINARY 112
#define TK_DECIMAL 113
#define TK_MAX_DELAY 114
#define TK_WATERMARK 115
#define TK_ROLLUP 116
#define TK_TTL 117
#define TK_SMA 118
#define TK_FIRST 119
#define TK_LAST 120
#define TK_SHOW 121
#define TK_DATABASES 122
#define TK_TABLES 123
#define TK_STABLES 124
#define TK_MNODES 125
#define TK_MODULES 126
#define TK_QNODES 127
#define TK_FUNCTIONS 128
#define TK_INDEXES 129
#define TK_ACCOUNTS 130
#define TK_APPS 131
#define TK_CONNECTIONS 132
#define TK_LICENCE 133
#define TK_GRANTS 134
#define TK_QUERIES 135
#define TK_SCORES 136
#define TK_TOPICS 137
#define TK_VARIABLES 138
#define TK_BNODES 139
#define TK_SNODES 140
#define TK_CLUSTER 141
#define TK_TRANSACTIONS 142
#define TK_DISTRIBUTED 143
#define TK_CONSUMERS 144
#define TK_SUBSCRIPTIONS 145
#define TK_LIKE 146
#define TK_INDEX 147
#define TK_FUNCTION 148
#define TK_INTERVAL 149
#define TK_TOPIC 150
#define TK_AS 151
#define TK_WITH 152
#define TK_META 153
#define TK_CONSUMER 154
#define TK_GROUP 155
#define TK_DESC 156
#define TK_DESCRIBE 157
#define TK_RESET 158
#define TK_QUERY 159
#define TK_CACHE 160
#define TK_EXPLAIN 161
#define TK_ANALYZE 162
#define TK_VERBOSE 163
#define TK_NK_BOOL 164
#define TK_RATIO 165
#define TK_NK_FLOAT 166
#define TK_COMPACT 167
#define TK_VNODES 168
#define TK_IN 169
#define TK_OUTPUTTYPE 170
#define TK_AGGREGATE 171
#define TK_BUFSIZE 172
#define TK_STREAM 173
#define TK_INTO 174
#define TK_TRIGGER 175
#define TK_AT_ONCE 176
#define TK_WINDOW_CLOSE 177
#define TK_IGNORE 178
#define TK_EXPIRED 179
#define TK_KILL 180
#define TK_CONNECTION 181
#define TK_TRANSACTION 182
#define TK_BALANCE 183
#define TK_VGROUP 184
#define TK_MERGE 185
#define TK_REDISTRIBUTE 186
#define TK_SPLIT 187
#define TK_SYNCDB 188
#define TK_DELETE 189
#define TK_INSERT 190
#define TK_NULL 191
#define TK_NK_QUESTION 192
#define TK_NK_ARROW 193
#define TK_ROWTS 194
#define TK_TBNAME 195
#define TK_QSTARTTS 196
#define TK_QENDTS 197
#define TK_WSTARTTS 198
#define TK_WENDTS 199
#define TK_WDURATION 200
#define TK_CAST 201
#define TK_NOW 202
#define TK_TODAY 203
#define TK_TIMEZONE 204
#define TK_CLIENT_VERSION 205
#define TK_SERVER_VERSION 206
#define TK_SERVER_STATUS 207
#define TK_CURRENT_USER 208
#define TK_COUNT 209
#define TK_LAST_ROW 210
#define TK_BETWEEN 211
#define TK_IS 212
#define TK_NK_LT 213
#define TK_NK_GT 214
#define TK_NK_LE 215
#define TK_NK_GE 216
#define TK_NK_NE 217
#define TK_MATCH 218
#define TK_NMATCH 219
#define TK_CONTAINS 220
#define TK_JOIN 221
#define TK_INNER 222
#define TK_SELECT 223
#define TK_DISTINCT 224
#define TK_WHERE 225
#define TK_PARTITION 226
#define TK_BY 227
#define TK_SESSION 228
#define TK_STATE_WINDOW 229
#define TK_SLIDING 230
#define TK_FILL 231
#define TK_VALUE 232
#define TK_NONE 233
#define TK_PREV 234
#define TK_LINEAR 235
#define TK_NEXT 236
#define TK_HAVING 237
#define TK_RANGE 238
#define TK_EVERY 239
#define TK_ORDER 240
#define TK_SLIMIT 241
#define TK_SOFFSET 242
#define TK_LIMIT 243
#define TK_OFFSET 244
#define TK_ASC 245
#define TK_NULLS 246
#define TK_ID 247
#define TK_NK_BITNOT 248
#define TK_VALUES 249
#define TK_IMPORT 250
#define TK_NK_SEMI 251
#define TK_FILE 252
#define TK_FLUSH 58
#define TK_IF 59
#define TK_NOT 60
#define TK_EXISTS 61
#define TK_BUFFER 62
#define TK_CACHELAST 63
#define TK_CACHELASTSIZE 64
#define TK_COMP 65
#define TK_DURATION 66
#define TK_NK_VARIABLE 67
#define TK_FSYNC 68
#define TK_MAXROWS 69
#define TK_MINROWS 70
#define TK_KEEP 71
#define TK_PAGES 72
#define TK_PAGESIZE 73
#define TK_PRECISION 74
#define TK_REPLICA 75
#define TK_STRICT 76
#define TK_WAL 77
#define TK_VGROUPS 78
#define TK_SINGLE_STABLE 79
#define TK_RETENTIONS 80
#define TK_SCHEMALESS 81
#define TK_NK_COLON 82
#define TK_TABLE 83
#define TK_NK_LP 84
#define TK_NK_RP 85
#define TK_STABLE 86
#define TK_ADD 87
#define TK_COLUMN 88
#define TK_MODIFY 89
#define TK_RENAME 90
#define TK_TAG 91
#define TK_SET 92
#define TK_NK_EQ 93
#define TK_USING 94
#define TK_TAGS 95
#define TK_COMMENT 96
#define TK_BOOL 97
#define TK_TINYINT 98
#define TK_SMALLINT 99
#define TK_INT 100
#define TK_INTEGER 101
#define TK_BIGINT 102
#define TK_FLOAT 103
#define TK_DOUBLE 104
#define TK_BINARY 105
#define TK_TIMESTAMP 106
#define TK_NCHAR 107
#define TK_UNSIGNED 108
#define TK_JSON 109
#define TK_VARCHAR 110
#define TK_MEDIUMBLOB 111
#define TK_BLOB 112
#define TK_VARBINARY 113
#define TK_DECIMAL 114
#define TK_MAX_DELAY 115
#define TK_WATERMARK 116
#define TK_ROLLUP 117
#define TK_TTL 118
#define TK_SMA 119
#define TK_FIRST 120
#define TK_LAST 121
#define TK_SHOW 122
#define TK_DATABASES 123
#define TK_TABLES 124
#define TK_STABLES 125
#define TK_MNODES 126
#define TK_MODULES 127
#define TK_QNODES 128
#define TK_FUNCTIONS 129
#define TK_INDEXES 130
#define TK_ACCOUNTS 131
#define TK_APPS 132
#define TK_CONNECTIONS 133
#define TK_LICENCE 134
#define TK_GRANTS 135
#define TK_QUERIES 136
#define TK_SCORES 137
#define TK_TOPICS 138
#define TK_VARIABLES 139
#define TK_BNODES 140
#define TK_SNODES 141
#define TK_CLUSTER 142
#define TK_TRANSACTIONS 143
#define TK_DISTRIBUTED 144
#define TK_CONSUMERS 145
#define TK_SUBSCRIPTIONS 146
#define TK_LIKE 147
#define TK_INDEX 148
#define TK_FUNCTION 149
#define TK_INTERVAL 150
#define TK_TOPIC 151
#define TK_AS 152
#define TK_WITH 153
#define TK_META 154
#define TK_CONSUMER 155
#define TK_GROUP 156
#define TK_DESC 157
#define TK_DESCRIBE 158
#define TK_RESET 159
#define TK_QUERY 160
#define TK_CACHE 161
#define TK_EXPLAIN 162
#define TK_ANALYZE 163
#define TK_VERBOSE 164
#define TK_NK_BOOL 165
#define TK_RATIO 166
#define TK_NK_FLOAT 167
#define TK_COMPACT 168
#define TK_VNODES 169
#define TK_IN 170
#define TK_OUTPUTTYPE 171
#define TK_AGGREGATE 172
#define TK_BUFSIZE 173
#define TK_STREAM 174
#define TK_INTO 175
#define TK_TRIGGER 176
#define TK_AT_ONCE 177
#define TK_WINDOW_CLOSE 178
#define TK_IGNORE 179
#define TK_EXPIRED 180
#define TK_KILL 181
#define TK_CONNECTION 182
#define TK_TRANSACTION 183
#define TK_BALANCE 184
#define TK_VGROUP 185
#define TK_MERGE 186
#define TK_REDISTRIBUTE 187
#define TK_SPLIT 188
#define TK_SYNCDB 189
#define TK_DELETE 190
#define TK_INSERT 191
#define TK_NULL 192
#define TK_NK_QUESTION 193
#define TK_NK_ARROW 194
#define TK_ROWTS 195
#define TK_TBNAME 196
#define TK_QSTARTTS 197
#define TK_QENDTS 198
#define TK_WSTARTTS 199
#define TK_WENDTS 200
#define TK_WDURATION 201
#define TK_CAST 202
#define TK_NOW 203
#define TK_TODAY 204
#define TK_TIMEZONE 205
#define TK_CLIENT_VERSION 206
#define TK_SERVER_VERSION 207
#define TK_SERVER_STATUS 208
#define TK_CURRENT_USER 209
#define TK_COUNT 210
#define TK_LAST_ROW 211
#define TK_BETWEEN 212
#define TK_IS 213
#define TK_NK_LT 214
#define TK_NK_GT 215
#define TK_NK_LE 216
#define TK_NK_GE 217
#define TK_NK_NE 218
#define TK_MATCH 219
#define TK_NMATCH 220
#define TK_CONTAINS 221
#define TK_JOIN 222
#define TK_INNER 223
#define TK_SELECT 224
#define TK_DISTINCT 225
#define TK_WHERE 226
#define TK_PARTITION 227
#define TK_BY 228
#define TK_SESSION 229
#define TK_STATE_WINDOW 230
#define TK_SLIDING 231
#define TK_FILL 232
#define TK_VALUE 233
#define TK_NONE 234
#define TK_PREV 235
#define TK_LINEAR 236
#define TK_NEXT 237
#define TK_HAVING 238
#define TK_RANGE 239
#define TK_EVERY 240
#define TK_ORDER 241
#define TK_SLIMIT 242
#define TK_SOFFSET 243
#define TK_LIMIT 244
#define TK_OFFSET 245
#define TK_ASC 246
#define TK_NULLS 247
#define TK_ID 248
#define TK_NK_BITNOT 249
#define TK_VALUES 250
#define TK_IMPORT 251
#define TK_NK_SEMI 252
#define TK_FILE 253
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -13,6 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_COMMAND_H
#define TDENGINE_COMMAND_H
#include "cmdnodes.h"
#include "tmsg.h"
#include "plannodes.h"
......@@ -27,4 +30,4 @@ int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
#endif
......@@ -36,6 +36,8 @@ typedef struct SReadHandle {
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
// int8_t initTsdbReader;
bool tqReader;
} SReadHandle;
......
......@@ -98,6 +98,11 @@ typedef struct SAlterDatabaseStmt {
SDatabaseOptions* pOptions;
} SAlterDatabaseStmt;
typedef struct SFlushDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
} SFlushDatabaseStmt;
typedef struct STableOptions {
ENodeType type;
bool commentNull;
......
......@@ -111,6 +111,7 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_FLUSH_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
QUERY_NODE_CREATE_MULTI_TABLE_STMT,
......
......@@ -20,9 +20,9 @@
extern "C" {
#endif
#include "executor.h"
#include "tmsgcb.h"
#include "trpc.h"
#include "executor.h"
enum {
NODE_TYPE_VNODE = 1,
......@@ -31,13 +31,6 @@ enum {
NODE_TYPE_MNODE,
};
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
} SDeleteRes;
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxTaskNum;
......@@ -46,19 +39,19 @@ typedef struct SQWorkerCfg {
typedef struct {
uint64_t cacheDataSize;
uint64_t queryProcessed;
uint64_t cqueryProcessed;
uint64_t fetchProcessed;
uint64_t dropProcessed;
uint64_t hbProcessed;
uint64_t deleteProcessed;
uint64_t numOfQueryInQueue;
uint64_t numOfFetchInQueue;
uint64_t timeInQueryQueue;
uint64_t timeInFetchQueue;
uint64_t numOfErrors;
} SQWorkerStat;
......@@ -82,7 +75,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes);
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt);
......
......@@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
......
......@@ -27,7 +27,7 @@ extern "C" {
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
#define IsReq(pMsg) (pMsg->msgType & 1U)
extern int32_t tsRpcHeadSize;
......@@ -35,12 +35,13 @@ typedef struct {
uint32_t clientIp;
uint16_t clientPort;
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
// rpc info
void * handle; // rpc handle returned to app
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
......@@ -53,7 +54,7 @@ typedef struct SRpcHandleInfo {
void *node; // node mgmt handle
// resp info
void * rsp;
void *rsp;
int32_t rspLen;
// conn info
......@@ -62,7 +63,7 @@ typedef struct SRpcHandleInfo {
typedef struct SRpcMsg {
tmsg_t msgType;
void * pCont;
void *pCont;
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
......@@ -74,7 +75,7 @@ typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char * label; // for debug purpose
char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
......@@ -99,12 +100,12 @@ typedef struct {
typedef struct {
int32_t msgType;
void * val;
void *val;
int32_t (*clone)(void *src, void **dst);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj * args;
SHashObj *args;
SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg);
} SRpcCtx;
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MALLOCATOR_H_
#define _TD_UTIL_MALLOCATOR_H_
#ifndef _TD_UTIL_TREALLOC_H_
#define _TD_UTIL_TREALLOC_H_
#include "os.h"
......@@ -22,34 +22,43 @@
extern "C" {
#endif
// Memory allocator
#define TD_MEM_ALCT(TYPE) \
struct { \
void *(*malloc_)(struct TYPE *, uint64_t size); \
void (*free_)(struct TYPE *, void *ptr); \
static FORCE_INLINE int32_t tRealloc(uint8_t **ppBuf, int64_t size) {
int32_t code = 0;
int64_t bsize = 0;
uint8_t *pBuf;
if (*ppBuf) {
bsize = *(int64_t *)((*ppBuf) - sizeof(int64_t));
}
if (bsize >= size) goto _exit;
if (bsize == 0) bsize = 64;
while (bsize < size) {
bsize *= 2;
}
#define TD_MA_MALLOC_FUNC(TMA) (TMA)->malloc_
#define TD_MA_FREE_FUNC(TMA) (TMA)->free_
#define TD_MA_MALLOC(TMA, SIZE) (*((TMA)->malloc_))(TMA, (SIZE))
#define TD_MA_FREE(TMA, PTR) (*((TMA)->free_))(TMA, (PTR))
pBuf = (uint8_t *)taosMemoryRealloc(*ppBuf ? (*ppBuf) - sizeof(int64_t) : *ppBuf, bsize + sizeof(int64_t));
if (pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
typedef struct SMemAllocator {
void *impl;
TD_MEM_ALCT(SMemAllocator);
} SMemAllocator;
*(int64_t *)pBuf = bsize;
*ppBuf = pBuf + sizeof(int64_t);
#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE)
#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR)
_exit:
return code;
}
typedef struct SMemAllocatorFactory {
void *impl;
SMemAllocator *(*create)(struct SMemAllocatorFactory *);
void (*destroy)(struct SMemAllocatorFactory *, SMemAllocator *);
} SMemAllocatorFactory;
static FORCE_INLINE void tFree(uint8_t *pBuf) {
if (pBuf) {
taosMemoryFree(pBuf - sizeof(int64_t));
}
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_MALLOCATOR_H_*/
\ No newline at end of file
#endif /*_TD_UTIL_TREALLOC_H_*/
......@@ -72,6 +72,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
......@@ -843,19 +843,25 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
// All data has returned to App already, no need to try again
if (pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// all data has returned to App already, no need to try again
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
SSchedulerReq req = {
......@@ -869,14 +875,14 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
}
......
......@@ -669,13 +669,13 @@ TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
......@@ -700,54 +700,55 @@ TEST(testCase, projection_query_tables) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
pRes = taos_query(pConn, "select * from tu");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
// taos_free_result(pRes);
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select ts from st1");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
......@@ -773,7 +774,7 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
/*
--- copy the following script in the shell to setup the environment ---
......@@ -819,5 +820,27 @@ TEST(testCase, async_api_test) {
getchar();
taos_close(pConn);
}
#endif
TEST(testCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s", taos_errstr(pRes));
}
taos_free_result(pRes);
char s[256] = {0};
for(int32_t i = 0; i < 7000; ++i) {
sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i);
pRes = taos_query(pConn, s);
taos_free_result(pRes);
}
}
#pragma GCC diagnostic pop
......@@ -1831,6 +1831,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
pSubmitBlk->suid = suid;
pSubmitBlk->uid = pDataBlock->info.groupId;
pSubmitBlk->numOfRows = rows;
pSubmitBlk->sversion = pTSchema->version;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
......
......@@ -29,15 +29,9 @@ typedef struct {
#pragma pack(pop)
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
// SValue
static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t n = 0;
if (IS_VAR_DATA_TYPE(type)) {
......@@ -88,7 +82,7 @@ static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
return n;
}
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t n = 0;
if (IS_VAR_DATA_TYPE(type)) {
......@@ -421,7 +415,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
_set_none:
if ((flags & 0xf0) == 0) {
setBitMap(pb, 0, iColumn - 1, flags);
if (flags & TSROW_HAS_VAL) { // set 0
if (flags & TSROW_HAS_VAL) { // set 0
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(VarDataOffsetT *)(pf + pTColumn->offset) = 0;
} else {
......@@ -434,7 +428,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
_set_null:
if ((flags & 0xf0) == 0) {
setBitMap(pb, 1, iColumn - 1, flags);
if (flags & TSROW_HAS_VAL) { // set 0
if (flags & TSROW_HAS_VAL) { // set 0
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(VarDataOffsetT *)(pf + pTColumn->offset) = 0;
} else {
......@@ -639,15 +633,15 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal
}
_return_none:
*pColVal = COL_VAL_NONE(pTColumn->colId);
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return;
_return_null:
*pColVal = COL_VAL_NULL(pTColumn->colId);
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return;
_return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, value);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return;
}
......@@ -1105,9 +1099,9 @@ _err:
#if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
if (IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
int spaceNeeded = pCol->bytes * maxPoints;
if (IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
}
#ifdef TD_SUPPORT_BITMAP
int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
......
......@@ -5445,6 +5445,37 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0;
}
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);
if (tEncodeU64(pCoder, pRes->suid) < 0) return -1;
if (tEncodeI32v(pCoder, nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tEncodeU64(pCoder, *(uint64_t *)taosArrayGet(pRes->uidList, iUid)) < 0) return -1;
}
if (tEncodeI64(pCoder, pRes->skey) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1;
if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1;
return 0;
}
int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
int32_t nUid;
uint64_t uid;
if (tDecodeU64(pCoder, &pRes->suid) < 0) return -1;
if (tDecodeI32v(pCoder, &nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tDecodeU64(pCoder, &uid) < 0) return -1;
taosArrayPush(pRes->uidList, &uid);
}
if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1;
if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1;
if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1;
return 0;
}
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
......
......@@ -34,6 +34,7 @@ const uint8_t tdVTypeByte[2][3] = {{
// declaration
static uint8_t tdGetBitmapByte(uint8_t byte);
static int32_t tdCompareColId(const void *arg1, const void *arg2);
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2);
// static void dataColSetNEleNull(SDataCol *pCol, int nEle);
......@@ -339,9 +340,9 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType, int8
}
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode) {
int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I);
uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM];
uint8_t *qBitmap = (uint8_t*)pBitmap;
int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I);
uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM];
uint8_t *qBitmap = (uint8_t *)pBitmap;
for (int i = 0; i < nBytes; ++i) {
if (*qBitmap != vTypeByte) {
return false;
......@@ -1045,13 +1046,28 @@ int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode) {
return result;
}
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) {
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
return true;
}
int16_t nCols = tdRowGetNCols(pRow) - 1;
if (nCols <= 0) {
pVal->valType = TD_VTYPE_NONE;
return true;
}
SKvRowIdx *pColIdx =
(SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), nCols, sizeof(SKvRowIdx), compareKvRowColId, TD_EQ);
if (!pColIdx) {
pVal->valType = TD_VTYPE_NONE;
return true;
}
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx);
tdGetKvRowValOfCol(pVal, pRow, pBitmap, pColIdx->offset,
POINTER_DISTANCE(pColIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx));
return true;
}
......@@ -1204,6 +1220,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2
}
}
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
STColumn *pTColumn;
SColVal *pColVal;
int32_t nColVal = taosArrayGetSize(pArray);
int32_t varDataLen = 0;
int32_t maxVarDataLen = 0;
int32_t iColVal = 0;
void *varBuf = NULL;
ASSERT(nColVal > 1);
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
} else {
pColVal = NULL;
}
if (iColumn == 0) {
ASSERT(pColVal->cid == pTColumn->colId);
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
if (pColVal) {
varDataLen += (pColVal->value.nData + sizeof(VarDataLenT));
if (maxVarDataLen < (pColVal->value.nData + sizeof(VarDataLenT))) {
maxVarDataLen = pColVal->value.nData + sizeof(VarDataLenT);
}
} else {
varDataLen += sizeof(VarDataLenT);
if (pTColumn->type == TSDB_DATA_TYPE_VARCHAR) {
varDataLen += CHAR_BYTES;
if (maxVarDataLen < CHAR_BYTES + sizeof(VarDataLenT)) {
maxVarDataLen = CHAR_BYTES + sizeof(VarDataLenT);
}
} else {
varDataLen += INT_BYTES;
if (maxVarDataLen < INT_BYTES + sizeof(VarDataLenT)) {
maxVarDataLen = INT_BYTES + sizeof(VarDataLenT);
}
}
}
}
}
++iColVal;
}
*ppRow = (STSRow *)taosMemoryCalloc(
1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1));
if (!(*ppRow)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (maxVarDataLen > 0) {
varBuf = taosMemoryMalloc(maxVarDataLen);
if (!varBuf) {
taosMemoryFreeClear(*ppRow);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetInfo(&rb, pTSchema->numOfCols, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, *ppRow);
iColVal = 0;
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
TDRowValT valType = TD_VTYPE_NORM;
const void *val = NULL;
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
if (pColVal->isNone) {
valType = TD_VTYPE_NONE;
} else if (pColVal->isNull) {
valType = TD_VTYPE_NULL;
} else if (IS_VAR_DATA_TYPE(pTColumn->type)) {
varDataSetLen(varBuf, pColVal->value.nData);
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
val = varBuf;
} else {
val = (const void *)&pColVal->value.i64;
}
} else {
pColVal = NULL;
valType = TD_VTYPE_NONE;
}
tdAppendColValToRow(&rb, pTColumn->colId, pTColumn->type, valType, val, true, pTColumn->offset, iColVal);
++iColVal;
}
taosMemoryFreeClear(varBuf);
return 0;
}
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
......@@ -1593,7 +1715,6 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou
} else {
pBuilder->rowType = TD_ROW_TP;
}
pBuilder->flen = flen;
pBuilder->nCols = nCols;
pBuilder->nBoundCols = nBoundCols;
......@@ -1855,4 +1976,36 @@ void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) {
pIter->pSchema = pSchema;
pIter->maxColId = pSchema->columns[pSchema->numOfCols - 1].colId;
}
\ No newline at end of file
}
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) {
STColumn *pTColumn = &pTSchema->columns[iCol];
SCellVal cv;
SValue value;
ASSERT(iCol > 0);
if (TD_IS_TP_ROW(pRow)) {
tdSTpRowGetVal(pRow, pTColumn->colId, pTColumn->type, pTSchema->flen, pTColumn->offset, iCol - 1, &cv);
} else if (TD_IS_KV_ROW(pRow)) {
ASSERT(iCol > 0);
tdSKvRowGetVal(pRow, pTColumn->colId, iCol - 1, &cv);
} else {
ASSERT(0);
}
if (tdValTypeIsNone(cv.valType)) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
} else if (tdValTypeIsNull(cv.valType)) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
value.nData = varDataLen(cv.val);
value.pData = varDataVal(cv.val);
} else {
tGetValue(cv.val, &value, pTColumn->type);
}
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
}
}
......@@ -350,6 +350,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -44,8 +44,12 @@ target_sources(
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbCache.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
# tq
"src/tq/tq.c"
......
......@@ -28,7 +28,6 @@
#include "tcommon.h"
#include "tfs.h"
#include "tmallocator.h"
#include "tmsg.h"
#include "trow.h"
......@@ -70,6 +69,10 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
......@@ -110,33 +113,31 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb
// typedef struct STsdb STsdb;
typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid);
int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList);
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
int32_t tsdbGetStbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_ORDER 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t *colId, int32_t numOfCols,
void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
int32_t tsdbLastrowReaderClose(void *pReader);
// tq
......
......@@ -62,6 +62,7 @@ struct STSmaStat {
struct SRSmaStat {
SSma *pSma;
int64_t submitVer;
int64_t refId; // shared by fetch tasks
void *tmrHandle; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
......@@ -84,6 +85,7 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
enum {
TASK_TRIGGER_STAT_INIT = 0,
......@@ -208,11 +210,15 @@ struct STFInfo {
// specific fields
union {
struct {
int64_t applyVer[2];
int64_t submitVer;
} qTaskInfo;
};
};
enum {
TD_FTYPE_RSMA_QTASKINFO = 0,
};
struct STFile {
uint8_t state;
STFInfo info;
......
此差异已折叠。
......@@ -20,6 +20,7 @@
#include "filter.h"
#include "qworker.h"
#include "sync.h"
#include "tRealloc.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcompare.h"
......@@ -27,15 +28,15 @@
#include "tdatablock.h"
#include "tdb.h"
#include "tencode.h"
#include "tref.h"
#include "tfs.h"
#include "tglobal.h"
#include "tjson.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlosertree.h"
#include "tmallocator.h"
#include "tlrucache.h"
#include "tmsgcb.h"
#include "tref.h"
#include "tskiplist.h"
#include "tstream.h"
#include "ttime.h"
......@@ -94,6 +95,7 @@ int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int metaGetTbNum(SMeta* pMeta);
......@@ -127,9 +129,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
......@@ -157,6 +157,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
......@@ -171,6 +172,7 @@ int32_t smaPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
......@@ -196,9 +198,9 @@ typedef struct {
// SVState
struct SVState {
// int64_t processed;
int64_t committed;
int64_t applied;
int64_t commitID;
};
struct SVnodeInfo {
......
......@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
}
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta;
SMeta *pMeta = pReader->pMeta;
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
......@@ -54,7 +54,7 @@ _err:
}
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta;
SMeta *pMeta = pReader->pMeta;
int64_t version;
// query uid.idx
......@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
}
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
SMeta * pMeta = pReader->pMeta;
SMeta *pMeta = pReader->pMeta;
tb_uid_t uid;
// query name.idx
......@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
}
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
tb_uid_t uid = 0;
......@@ -138,7 +138,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
int metaTbCursorNext(SMTbCursor *pTbCur) {
int ret;
void * pBuf;
void *pBuf;
STbCfg tbCfg;
for (;;) {
......@@ -159,7 +159,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
int64_t version;
SSchemaWrapper schema = {0};
......@@ -218,9 +218,9 @@ _err:
return NULL;
}
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
TBC * pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) {
return ret;
}
......@@ -235,13 +235,13 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
}
void *pKey = NULL;
int kLen = 0;
while(1){
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
break;
}
ttlKey = *(STtlIdxKey*)pKey;
ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid);
}
tdbTbcClose(pCur);
......@@ -252,11 +252,11 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
}
struct SMCtbCursor {
SMeta * pMeta;
TBC * pCur;
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
......@@ -388,15 +388,15 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
if (ret < 0) {
return 0;
}
return *(tb_uid_t*)pStbCur->pKey;
return *(tb_uid_t *)pStbCur->pKey;
}
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
// SMetaReader mr = {0};
STSchema * pTSchema = NULL;
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0};
SSchema * pSchema;
SSchema *pSchema;
pSW = metaGetTableSchema(pMeta, uid, sver, 0);
if (!pSW) return NULL;
......@@ -415,6 +415,51 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema;
}
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
int32_t code = 0;
STSchema *pTSchema = NULL;
SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
void *pData = NULL;
int nData = 0;
// query
metaRLock(pMeta);
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
code = TSDB_CODE_NOT_FOUND;
metaULock(pMeta);
goto _err;
}
metaULock(pMeta);
// decode
SDecoder dc = {0};
SSchemaWrapper schema;
SSchemaWrapper *pSchemaWrapper = &schema;
tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
tDecoderClear(&dc);
// convert
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
for (int i = 0; i < pSchemaWrapper->nCols; i++) {
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
return code;
_err:
*ppTSchema = NULL;
return code;
}
int metaGetTbNum(SMeta *pMeta) {
// TODO
// ASSERT(0);
......@@ -422,11 +467,11 @@ int metaGetTbNum(SMeta *pMeta) {
}
typedef struct {
SMeta * pMeta;
TBC * pCur;
SMeta *pMeta;
TBC *pCur;
tb_uid_t uid;
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMSmaCursor;
......@@ -498,7 +543,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL;
SArray * pSmaIds = NULL;
SArray *pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL;
......@@ -522,7 +567,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma * pTSma = NULL;
STSma *pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
......@@ -570,7 +615,7 @@ _err:
}
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma * pTSma = NULL;
STSma *pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
......@@ -592,7 +637,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
}
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
SArray * pUids = NULL;
SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
......@@ -630,7 +675,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
}
SArray *metaGetSmaTbUids(SMeta *pMeta) {
SArray * pUids = NULL;
SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0;
......@@ -689,20 +734,20 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
}
typedef struct {
SMeta * pMeta;
TBC * pCur;
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
int16_t cid;
int16_t type;
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int32_t kLen;
int32_t vLen;
} SIdxCursor;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
SIdxCursor *pCursor = NULL;
char * buf = NULL;
char *buf = NULL;
int32_t maxSize = 0;
int32_t ret = 0, valid = 0;
......@@ -721,7 +766,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t nKey = 0;
int32_t nTagData = 0;
void * tagData = NULL;
void *tagData = NULL;
if (param->val == NULL) {
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
......@@ -757,7 +802,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END;
}
void * entryKey = NULL, *entryVal = NULL;
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
bool first = true;
while (1) {
......
......@@ -523,6 +523,17 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray);
}
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
if (level == TSDB_RETENTION_L0) {
return pSma->pVnode->state.applied;
}
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
......@@ -562,7 +573,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pRSmaInfo->pStat->submitVer, 1), pReq) < 0) {
taosMemoryFreeClear(pReq);
goto _err;
}
......@@ -814,6 +825,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
}
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0;
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
......@@ -828,6 +840,18 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err;
}
STFInfo tFileInfo = {0};
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
goto _err;
}
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
......@@ -1094,6 +1118,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
}
STFile tFile = {0};
if (RSMA_SUBMIT_VER(pRSmaStat) > 0) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
......@@ -1129,12 +1169,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, -1) < 0) {
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
i + 1, TD_TFILE_FULL_NAME(&tFile));
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
......@@ -1161,6 +1201,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
}
if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer);
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno));
......
......@@ -32,6 +32,9 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
}
return tlen;
}
......@@ -41,6 +44,11 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
// specific
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
}
return buf;
}
......
......@@ -462,8 +462,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid);
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
......
......@@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) {
.reader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
ASSERT(handle.execHandle.execCol.task[i]);
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"
typedef struct SLastrowReader {
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
// int32_t* pSlotIds;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list
} SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
int32_t numOfRows = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SColVal colVal = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
} else {
tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull || colVal.isNone) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
}
}
}
pBlock->info.rows += 1;
}
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->type = type;
p->pVnode = pVnode;
p->numOfCols = numOfCols;
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
p->pTableList = pTableIdList;
for(int32_t i = 0; i < p->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(colId[i])) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
}
}
*pReader = p;
return TSDB_CODE_SUCCESS;
}
int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader;
for (int32_t i = 0; i < p->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]);
}
taosMemoryFree(p->transferBuf);
taosMemoryFree(pReader);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SLastrowReader* pr = pReader;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list.
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (h == NULL) {
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
internalResult = true;
lastKey = pRow->ts;
}
tsdbCacheRelease(lruCache, h);
}
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (h == NULL) {
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
saveOneRow(pRow, pResBlock, pr, slotIds);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
}
}
} else {
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -25,8 +25,6 @@
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
......@@ -44,10 +42,12 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosInitRWLatch(&pMemTable->latch);
pMemTable->pTsdb = pTsdb;
pMemTable->nRef = 1;
pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
pMemTable->minVersion = VERSION_MAX;
pMemTable->maxVersion = VERSION_MIN;
pMemTable->nRow = 0;
pMemTable->nDelOp = 0;
pMemTable->nDel = 0;
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
if (pMemTable->aTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -146,6 +146,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL;
SVBufPool *pPool = pTsdb->pVnode->inUse;
TSDBKEY lastKey = {.version = version, .ts = eKey};
// check if table exists (todo)
......@@ -155,26 +156,32 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
}
// do delete
SDelOp *pDelOp = (SDelOp *)vnodeBufPoolMalloc(pPool, sizeof(*pDelOp));
if (pDelOp == NULL) {
SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
if (pDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pDelOp->version = version;
pDelOp->sKey = sKey;
pDelOp->eKey = eKey;
pDelOp->pNext = NULL;
pDelData->version = version;
pDelData->sKey = sKey;
pDelData->eKey = eKey;
pDelData->pNext = NULL;
if (pTbData->pHead == NULL) {
ASSERT(pTbData->pTail == NULL);
pTbData->pHead = pTbData->pTail = pDelOp;
pTbData->pHead = pTbData->pTail = pDelData;
} else {
pTbData->pTail->pNext = pDelOp;
pTbData->pTail = pDelOp;
pTbData->pTail->pNext = pDelData;
pTbData->pTail = pDelData;
}
// update the state of pMemTable and other (todo)
pMemTable->nDelOp++;
pMemTable->minVersion = TMIN(pMemTable->minVersion, version);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey);
}
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s",
......@@ -213,9 +220,15 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
SMemSkipListNode *pos[SL_MAX_LEVEL];
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
pHead = pTbData->sl.pHead;
pTail = pTbData->sl.pTail;
pIter->pTbData = pTbData;
pIter->backward = backward;
pIter->pRow = NULL;
pIter->row.type = 0;
if (pFrom == NULL) {
// create from head or tail
if (backward) {
......@@ -239,6 +252,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;
pIter->pRow = NULL;
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
......@@ -266,31 +280,29 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return true;
}
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) {
SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;
TSDBROW row = {0};
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
// we add here for commit usage
if (pIter == NULL) return NULL;
if (pRow == NULL) {
pRow = &row;
if (pIter->pRow) {
goto _exit;
}
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
if (pIter->pNode == pHead) {
return false;
if (pIter->pNode == pIter->pTbData->sl.pHead) {
goto _exit;
}
} else {
ASSERT(pIter->pNode != pHead);
if (pIter->pNode == pTail) {
return false;
if (pIter->pNode == pIter->pTbData->sl.pTail) {
goto _exit;
}
}
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), pRow);
return true;
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
pIter->pRow = &pIter->row;
_exit:
return pIter->pRow;
}
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
......@@ -317,8 +329,11 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
}
pTbData->suid = suid;
pTbData->uid = uid;
pTbData->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
pTbData->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
pTbData->minKey = TSKEY_MAX;
pTbData->maxKey = TSKEY_MIN;
pTbData->minVersion = VERSION_MAX;
pTbData->maxVersion = VERSION_MIN;
pTbData->maxSkmVer = -1;
pTbData->pHead = NULL;
pTbData->pTail = NULL;
pTbData->sl.seed = taosRand();
......@@ -493,8 +508,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
SSubmitBlkIter blkIter = {0};
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW row = {.version = version, .pTSRow = NULL};
TSDBROW row = tsdbRowFromTSRow(version, NULL);
int32_t nRow = 0;
STSRow *pLastRow = NULL;
tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter);
......@@ -508,13 +524,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err;
}
if (tsdbKeyCmprFn(&key, &pTbData->minKey) < 0) {
pTbData->minKey = key;
}
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
if (tsdbKeyCmprFn(&key, &pMemTable->minKey) < 0) {
pMemTable->minKey = key;
}
pLastRow = row.pTSRow;
// forward put rest data
row.pTSRow = tGetSubmitBlkNext(&blkIter);
......@@ -531,18 +543,35 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err;
}
pLastRow = row.pTSRow;
row.pTSRow = tGetSubmitBlkNext(&blkIter);
} while (row.pTSRow);
}
if (tsdbKeyCmprFn(&key, &pTbData->maxKey) > 0) {
pTbData->maxKey = key;
}
if (key.ts >= pTbData->maxKey) {
if (key.ts > pTbData->maxKey) {
pTbData->maxKey = key.ts;
}
if (tsdbKeyCmprFn(&key, &pMemTable->maxKey) > 0) {
pMemTable->maxKey = key;
if (pLastRow != NULL) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
}
}
pMemTable->nRef++;
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
pTbData->minVersion = TMIN(pTbData->minVersion, version);
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
pTbData->maxSkmVer = TMAX(pTbData->maxSkmVer, pMsgIter->sversion);
// SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->minVersion = TMIN(pMemTable->minVersion, pTbData->minVersion);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, pTbData->maxVersion);
pMemTable->nRow += nRow;
pRsp->numOfRows = nRow;
pRsp->affectedRows = nRow;
......@@ -552,22 +581,4 @@ _err:
return code;
}
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
n += tPutI64(p, pRow->version);
if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
n += pRow->pTSRow->len;
return n;
}
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
n += tGetI64(p, &pRow->version);
pRow->pTSRow = (STSRow *)(p + n);
n += pRow->pTSRow->len;
return n;
}
\ No newline at end of file
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
......@@ -17,7 +17,6 @@
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg);
// implementation
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) {
......@@ -42,7 +41,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
int slen = 0;
*ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3;
slen = strlen(pVnode->path) + strlen(dir) + 2;
// create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
......@@ -51,10 +50,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
return -1;
}
ASSERT(strlen(dir) < TSDB_DATA_DIR_LEN);
memcpy(pTsdb->dir, dir, strlen(dir));
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode;
pTsdb->repoLocked = false;
......@@ -64,13 +61,17 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
} else {
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
}
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path);
// create dir
tfsMkdir(pVnode->pTfs, pTsdb->path);
// open tsdb
if (tsdbOpenFS(pTsdb) < 0) {
if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) {
goto _err;
}
if (tsdbOpenCache(pTsdb) < 0) {
goto _err;
}
......@@ -87,10 +88,9 @@ _err:
int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
// TODO: destroy mem/imem
taosThreadMutexDestroy(&(*pTsdb)->mutex);
tsdbCloseFS(*pTsdb);
tsdbFreeFS((*pTsdb)->fs);
tsdbFSClose((*pTsdb)->fs);
tsdbCloseCache((*pTsdb)->lruCache);
taosMemoryFreeClear(*pTsdb);
}
return 0;
......@@ -99,7 +99,7 @@ int tsdbClose(STsdb **pTsdb) {
int tsdbLockRepo(STsdb *pTsdb) {
int code = taosThreadMutexLock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d, failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
......@@ -108,13 +108,13 @@ int tsdbLockRepo(STsdb *pTsdb) {
}
int tsdbUnlockRepo(STsdb *pTsdb) {
ASSERT(IS_REPO_LOCKED(pTsdb));
// ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb->repoLocked = false;
int code = taosThreadMutexUnlock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d, failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
\ No newline at end of file
}
此差异已折叠。
此差异已折叠。
......@@ -28,7 +28,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
// scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
}
return -1;
}
......@@ -77,7 +77,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey);
TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
......@@ -92,7 +92,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days;
......
......@@ -40,6 +40,7 @@ int vnodeBegin(SVnode *pVnode) {
/* pthread_mutex_unlock(); */
pVnode->state.commitID++;
// begin meta
if (metaBegin(pVnode->pMeta) < 0) {
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
......@@ -47,26 +48,21 @@ int vnodeBegin(SVnode *pVnode) {
}
// begin tsdb
if (pVnode->pSma) {
if (tsdbBegin(VND_RSMA0(pVnode)) < 0) {
vError("vgId:%d, failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbBegin(VND_RSMA1(pVnode)) < 0) {
if (pVnode->pSma) {
if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbBegin(VND_RSMA2(pVnode)) < 0) {
if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
} else {
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
}
// begin sma
......@@ -218,7 +214,8 @@ int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
vInfo("vgId:%d, start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied);
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
......@@ -226,6 +223,7 @@ int vnodeCommit(SVnode *pVnode) {
// save info
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0);
......@@ -294,7 +292,7 @@ static int vnodeCommitImpl(void *arg) {
// metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb);
// tsdbCommit(pVnode->pTsdb, );
// vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit));
......@@ -317,7 +315,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
return 0;
}
......@@ -327,9 +325,9 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "applied version", pState->applied, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1;
return 0;
}
......
......@@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.config = *pCfg;
info.state.committed = -1;
info.state.applied = -1;
info.state.commitID = 0;
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
......@@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->config = info.config;
pVnode->state.committed = info.state.committed;
pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID;
pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb;
pVnode->blockCount = 0;
......
......@@ -261,11 +261,49 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
}
}
// wrapper of tsdb read interface
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableList, uint64_t qId,
void *pMemRef) {
#if 0
return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif
return 0;
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
taosArrayPush(list, &info);
}
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
void *vnodeGetIdx(SVnode *pVnode) {
if (pVnode == NULL) {
return NULL;
}
return metaGetIdx(pVnode->pMeta);
}
void *vnodeGetIvtIdx(SVnode *pVnode) {
if (pVnode == NULL) {
return NULL;
}
return metaGetIvtIdx(pVnode->pMeta);
}
......@@ -25,8 +25,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
......@@ -93,11 +93,44 @@ int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
}
} break;
case TDMT_VND_DELETE: {
int32_t size;
int32_t ret;
uint8_t *pCont;
SEncoder *pCoder = &(SEncoder){0};
SDeleteRes res = {0};
SReadHandle handle = {
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) goto _err;
// malloc and encode
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
pCont = rpcMallocCont(size + sizeof(SMsgHead));
((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
tEncodeDeleteRes(pCoder, &res);
tEncoderClear(pCoder);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = pCont;
pMsg->contLen = size + sizeof(SMsgHead);
taosArrayDestroy(res.uidList);
} break;
default:
break;
}
return code;
_err:
vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
return code;
}
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
......@@ -146,7 +179,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break;
case TDMT_VND_DELETE:
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err;
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
/* TQ */
case TDMT_VND_MQ_VG_CHANGE:
......@@ -185,6 +218,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
case TDMT_VND_ALTER_CONFIG:
break;
case TDMT_VND_COMMIT:
goto _do_commit;
default:
ASSERT(0);
break;
......@@ -199,6 +234,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// commit if need
if (vnodeShouldCommit(pVnode)) {
_do_commit:
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
// commit current change
vnodeCommit(pVnode);
......@@ -280,22 +316,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
}
}
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDeleteRes res = {0};
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_DELETE:
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res);
default:
vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
......@@ -316,7 +336,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t t = ntohl(*(int32_t *)pReq);
vDebug("vgId:%d, recv ttl msg, time:%d", pVnode->config.vgId, t);
vError("rec ttl time:%d", t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if (ret != 0) {
goto end;
......@@ -856,3 +876,31 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
// 3. reload sync
return 0;
}
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SDecoder *pCoder = &(SDecoder){0};
SDeleteRes *pRes = &(SDeleteRes){0};
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
pRes->skey, pRes->ekey);
if (code) goto _err;
}
tDecoderClear(pCoder);
taosArrayDestroy(pRes->uidList);
return code;
_err:
return code;
}
\ No newline at end of file
......@@ -565,7 +565,6 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
}
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
int32_t numOfCols = LIST_LENGTH(pProjects);
blockDataEnsureCapacity(pBlock, 1);
int32_t index = 0;
......@@ -579,7 +578,6 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
}
pBlock->info.rows = 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -106,7 +106,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
......
此差异已折叠。
此差异已折叠。
......@@ -143,6 +143,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册