未验证 提交 d843a8ac 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #14486 from taosdata/feat/tsdb_refact

Feat/tsdb refact
...@@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT}) ...@@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT})
endif(${BUILD_WITH_TRAFT}) endif(${BUILD_WITH_TRAFT})
add_subdirectory(tdev) add_subdirectory(tdev)
add_subdirectory(lz4)
add_executable(lz4_test "")
target_sources(lz4_test
PRIVATE
"main.c"
)
target_link_libraries(lz4_test lz4_static)
\ No newline at end of file
#include <stdio.h>
#include "lz4.h"
int main(int argc, char const *argv[]) {
printf("%d\n", LZ4_compressBound(1024));
return 0;
}
Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
...@@ -73,15 +73,17 @@ typedef struct { ...@@ -73,15 +73,17 @@ typedef struct {
uint64_t suid; uint64_t suid;
} STableListInfo; } STableListInfo;
#pragma pack(push, 1)
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
int16_t colId; int16_t colId;
int16_t maxIndex;
int16_t minIndex; int16_t minIndex;
int16_t maxIndex;
int16_t numOfNull; int16_t numOfNull;
int64_t sum; int64_t sum;
int64_t max; int64_t max;
int64_t min; int64_t min;
} SColumnDataAgg; } SColumnDataAgg;
#pragma pack(pop)
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
STimeWindow window; STimeWindow window;
...@@ -122,13 +124,11 @@ typedef struct SColumnInfoData { ...@@ -122,13 +124,11 @@ typedef struct SColumnInfoData {
} SColumnInfoData; } SColumnInfoData;
typedef struct SQueryTableDataCond { typedef struct SQueryTableDataCond {
// STimeWindow twindow;
uint64_t suid; uint64_t suid;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
bool loadExternalRows; // load external rows or not int32_t type; // data block load type:
int32_t type; // data block load type:
int32_t numOfTWindows; int32_t numOfTWindows;
STimeWindow* twindows; STimeWindow* twindows;
int64_t startVersion; int64_t startVersion;
......
...@@ -34,21 +34,40 @@ typedef struct SValue SValue; ...@@ -34,21 +34,40 @@ typedef struct SValue SValue;
typedef struct SColVal SColVal; typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2; typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder; typedef struct STSRowBuilder STSRowBuilder;
typedef struct SColData SColData;
typedef struct STagVal STagVal; typedef struct STagVal STagVal;
typedef struct STag STag; typedef struct STag STag;
// bitmap
#define N1(n) ((1 << (n)) - 1)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) \
do { \
(p)[(i) / 8] &= N1((i) % 8); \
(p)[(i) / 8] |= (((uint8_t)(v)) << (((i) % 8))); \
} while (0)
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define SET_BIT2(p, i, v) \
do { \
p[(i) / 4] &= N1((i) % 4 * 2); \
(p)[(i) / 4] |= (((uint8_t)(v)) << (((i) % 4) * 2)); \
} while (0)
#define GET_BIT2(p, i) (((p)[(i) / 4] >> (((i) % 4) * 2)) & ((uint8_t)3))
// STSchema // STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema); int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema); void tTSchemaDestroy(STSchema *pTSchema);
// SValue // SValue
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type); int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
// STSRow2 // STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1}) #define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1}) #define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)}) #define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
...@@ -140,6 +159,7 @@ struct SValue { ...@@ -140,6 +159,7 @@ struct SValue {
struct SColVal { struct SColVal {
int16_t cid; int16_t cid;
int8_t type;
int8_t isNone; int8_t isNone;
int8_t isNull; int8_t isNull;
SValue value; SValue value;
...@@ -172,12 +192,6 @@ struct STag { ...@@ -172,12 +192,6 @@ struct STag {
}; };
#pragma pack(pop) #pragma pack(pop)
struct SColData {
int16_t cid;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================ #if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP #define TD_SUPPORT_BITMAP
......
...@@ -3024,6 +3024,17 @@ typedef struct { ...@@ -3024,6 +3024,17 @@ typedef struct {
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq); int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq);
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq); int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq);
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
int64_t affectedRows;
} SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -200,6 +200,7 @@ enum { ...@@ -200,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG)
......
...@@ -299,6 +299,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp ...@@ -299,6 +299,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp
int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int8_t colType, int32_t offset, int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int8_t colType, int32_t offset,
int16_t colIdx); int16_t colIdx);
int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset, int16_t colIdx); int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset, int16_t colIdx);
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal);
typedef struct { typedef struct {
STSchema *pSchema; STSchema *pSchema;
...@@ -312,6 +313,7 @@ typedef struct { ...@@ -312,6 +313,7 @@ typedef struct {
void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow); void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow);
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema); void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema);
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow);
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal); bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal); bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal); bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
...@@ -320,7 +322,7 @@ STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema ...@@ -320,7 +322,7 @@ STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema
int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode); int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode);
bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx,
SCellVal *pVal); SCellVal *pVal);
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal); bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode); int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode);
void tdSCellValPrint(SCellVal *pVal, int8_t colType); void tdSCellValPrint(SCellVal *pVal, int8_t colType);
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag); void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag);
......
...@@ -73,200 +73,201 @@ ...@@ -73,200 +73,201 @@
#define TK_MNODE 55 #define TK_MNODE 55
#define TK_DATABASE 56 #define TK_DATABASE 56
#define TK_USE 57 #define TK_USE 57
#define TK_IF 58 #define TK_FLUSH 58
#define TK_NOT 59 #define TK_IF 59
#define TK_EXISTS 60 #define TK_NOT 60
#define TK_BUFFER 61 #define TK_EXISTS 61
#define TK_CACHELAST 62 #define TK_BUFFER 62
#define TK_COMP 63 #define TK_CACHELAST 63
#define TK_DURATION 64 #define TK_COMP 64
#define TK_NK_VARIABLE 65 #define TK_DURATION 65
#define TK_FSYNC 66 #define TK_NK_VARIABLE 66
#define TK_MAXROWS 67 #define TK_FSYNC 67
#define TK_MINROWS 68 #define TK_MAXROWS 68
#define TK_KEEP 69 #define TK_MINROWS 69
#define TK_PAGES 70 #define TK_KEEP 70
#define TK_PAGESIZE 71 #define TK_PAGES 71
#define TK_PRECISION 72 #define TK_PAGESIZE 72
#define TK_REPLICA 73 #define TK_PRECISION 73
#define TK_STRICT 74 #define TK_REPLICA 74
#define TK_WAL 75 #define TK_STRICT 75
#define TK_VGROUPS 76 #define TK_WAL 76
#define TK_SINGLE_STABLE 77 #define TK_VGROUPS 77
#define TK_RETENTIONS 78 #define TK_SINGLE_STABLE 78
#define TK_SCHEMALESS 79 #define TK_RETENTIONS 79
#define TK_NK_COLON 80 #define TK_SCHEMALESS 80
#define TK_TABLE 81 #define TK_NK_COLON 81
#define TK_NK_LP 82 #define TK_TABLE 82
#define TK_NK_RP 83 #define TK_NK_LP 83
#define TK_STABLE 84 #define TK_NK_RP 84
#define TK_ADD 85 #define TK_STABLE 85
#define TK_COLUMN 86 #define TK_ADD 86
#define TK_MODIFY 87 #define TK_COLUMN 87
#define TK_RENAME 88 #define TK_MODIFY 88
#define TK_TAG 89 #define TK_RENAME 89
#define TK_SET 90 #define TK_TAG 90
#define TK_NK_EQ 91 #define TK_SET 91
#define TK_USING 92 #define TK_NK_EQ 92
#define TK_TAGS 93 #define TK_USING 93
#define TK_COMMENT 94 #define TK_TAGS 94
#define TK_BOOL 95 #define TK_COMMENT 95
#define TK_TINYINT 96 #define TK_BOOL 96
#define TK_SMALLINT 97 #define TK_TINYINT 97
#define TK_INT 98 #define TK_SMALLINT 98
#define TK_INTEGER 99 #define TK_INT 99
#define TK_BIGINT 100 #define TK_INTEGER 100
#define TK_FLOAT 101 #define TK_BIGINT 101
#define TK_DOUBLE 102 #define TK_FLOAT 102
#define TK_BINARY 103 #define TK_DOUBLE 103
#define TK_TIMESTAMP 104 #define TK_BINARY 104
#define TK_NCHAR 105 #define TK_TIMESTAMP 105
#define TK_UNSIGNED 106 #define TK_NCHAR 106
#define TK_JSON 107 #define TK_UNSIGNED 107
#define TK_VARCHAR 108 #define TK_JSON 108
#define TK_MEDIUMBLOB 109 #define TK_VARCHAR 109
#define TK_BLOB 110 #define TK_MEDIUMBLOB 110
#define TK_VARBINARY 111 #define TK_BLOB 111
#define TK_DECIMAL 112 #define TK_VARBINARY 112
#define TK_MAX_DELAY 113 #define TK_DECIMAL 113
#define TK_WATERMARK 114 #define TK_MAX_DELAY 114
#define TK_ROLLUP 115 #define TK_WATERMARK 115
#define TK_TTL 116 #define TK_ROLLUP 116
#define TK_SMA 117 #define TK_TTL 117
#define TK_FIRST 118 #define TK_SMA 118
#define TK_LAST 119 #define TK_FIRST 119
#define TK_SHOW 120 #define TK_LAST 120
#define TK_DATABASES 121 #define TK_SHOW 121
#define TK_TABLES 122 #define TK_DATABASES 122
#define TK_STABLES 123 #define TK_TABLES 123
#define TK_MNODES 124 #define TK_STABLES 124
#define TK_MODULES 125 #define TK_MNODES 125
#define TK_QNODES 126 #define TK_MODULES 126
#define TK_FUNCTIONS 127 #define TK_QNODES 127
#define TK_INDEXES 128 #define TK_FUNCTIONS 128
#define TK_ACCOUNTS 129 #define TK_INDEXES 129
#define TK_APPS 130 #define TK_ACCOUNTS 130
#define TK_CONNECTIONS 131 #define TK_APPS 131
#define TK_LICENCE 132 #define TK_CONNECTIONS 132
#define TK_GRANTS 133 #define TK_LICENCE 133
#define TK_QUERIES 134 #define TK_GRANTS 134
#define TK_SCORES 135 #define TK_QUERIES 135
#define TK_TOPICS 136 #define TK_SCORES 136
#define TK_VARIABLES 137 #define TK_TOPICS 137
#define TK_BNODES 138 #define TK_VARIABLES 138
#define TK_SNODES 139 #define TK_BNODES 139
#define TK_CLUSTER 140 #define TK_SNODES 140
#define TK_TRANSACTIONS 141 #define TK_CLUSTER 141
#define TK_DISTRIBUTED 142 #define TK_TRANSACTIONS 142
#define TK_CONSUMERS 143 #define TK_DISTRIBUTED 143
#define TK_SUBSCRIPTIONS 144 #define TK_CONSUMERS 144
#define TK_LIKE 145 #define TK_SUBSCRIPTIONS 145
#define TK_INDEX 146 #define TK_LIKE 146
#define TK_FUNCTION 147 #define TK_INDEX 147
#define TK_INTERVAL 148 #define TK_FUNCTION 148
#define TK_TOPIC 149 #define TK_INTERVAL 149
#define TK_AS 150 #define TK_TOPIC 150
#define TK_WITH 151 #define TK_AS 151
#define TK_META 152 #define TK_WITH 152
#define TK_CONSUMER 153 #define TK_META 153
#define TK_GROUP 154 #define TK_CONSUMER 154
#define TK_DESC 155 #define TK_GROUP 155
#define TK_DESCRIBE 156 #define TK_DESC 156
#define TK_RESET 157 #define TK_DESCRIBE 157
#define TK_QUERY 158 #define TK_RESET 158
#define TK_CACHE 159 #define TK_QUERY 159
#define TK_EXPLAIN 160 #define TK_CACHE 160
#define TK_ANALYZE 161 #define TK_EXPLAIN 161
#define TK_VERBOSE 162 #define TK_ANALYZE 162
#define TK_NK_BOOL 163 #define TK_VERBOSE 163
#define TK_RATIO 164 #define TK_NK_BOOL 164
#define TK_NK_FLOAT 165 #define TK_RATIO 165
#define TK_COMPACT 166 #define TK_NK_FLOAT 166
#define TK_VNODES 167 #define TK_COMPACT 167
#define TK_IN 168 #define TK_VNODES 168
#define TK_OUTPUTTYPE 169 #define TK_IN 169
#define TK_AGGREGATE 170 #define TK_OUTPUTTYPE 170
#define TK_BUFSIZE 171 #define TK_AGGREGATE 171
#define TK_STREAM 172 #define TK_BUFSIZE 172
#define TK_INTO 173 #define TK_STREAM 173
#define TK_TRIGGER 174 #define TK_INTO 174
#define TK_AT_ONCE 175 #define TK_TRIGGER 175
#define TK_WINDOW_CLOSE 176 #define TK_AT_ONCE 176
#define TK_IGNORE 177 #define TK_WINDOW_CLOSE 177
#define TK_EXPIRED 178 #define TK_IGNORE 178
#define TK_KILL 179 #define TK_EXPIRED 179
#define TK_CONNECTION 180 #define TK_KILL 180
#define TK_TRANSACTION 181 #define TK_CONNECTION 181
#define TK_BALANCE 182 #define TK_TRANSACTION 182
#define TK_VGROUP 183 #define TK_BALANCE 183
#define TK_MERGE 184 #define TK_VGROUP 184
#define TK_REDISTRIBUTE 185 #define TK_MERGE 185
#define TK_SPLIT 186 #define TK_REDISTRIBUTE 186
#define TK_SYNCDB 187 #define TK_SPLIT 187
#define TK_DELETE 188 #define TK_SYNCDB 188
#define TK_INSERT 189 #define TK_DELETE 189
#define TK_NULL 190 #define TK_INSERT 190
#define TK_NK_QUESTION 191 #define TK_NULL 191
#define TK_NK_ARROW 192 #define TK_NK_QUESTION 192
#define TK_ROWTS 193 #define TK_NK_ARROW 193
#define TK_TBNAME 194 #define TK_ROWTS 194
#define TK_QSTARTTS 195 #define TK_TBNAME 195
#define TK_QENDTS 196 #define TK_QSTARTTS 196
#define TK_WSTARTTS 197 #define TK_QENDTS 197
#define TK_WENDTS 198 #define TK_WSTARTTS 198
#define TK_WDURATION 199 #define TK_WENDTS 199
#define TK_CAST 200 #define TK_WDURATION 200
#define TK_NOW 201 #define TK_CAST 201
#define TK_TODAY 202 #define TK_NOW 202
#define TK_TIMEZONE 203 #define TK_TODAY 203
#define TK_CLIENT_VERSION 204 #define TK_TIMEZONE 204
#define TK_SERVER_VERSION 205 #define TK_CLIENT_VERSION 205
#define TK_SERVER_STATUS 206 #define TK_SERVER_VERSION 206
#define TK_CURRENT_USER 207 #define TK_SERVER_STATUS 207
#define TK_COUNT 208 #define TK_CURRENT_USER 208
#define TK_LAST_ROW 209 #define TK_COUNT 209
#define TK_BETWEEN 210 #define TK_LAST_ROW 210
#define TK_IS 211 #define TK_BETWEEN 211
#define TK_NK_LT 212 #define TK_IS 212
#define TK_NK_GT 213 #define TK_NK_LT 213
#define TK_NK_LE 214 #define TK_NK_GT 214
#define TK_NK_GE 215 #define TK_NK_LE 215
#define TK_NK_NE 216 #define TK_NK_GE 216
#define TK_MATCH 217 #define TK_NK_NE 217
#define TK_NMATCH 218 #define TK_MATCH 218
#define TK_CONTAINS 219 #define TK_NMATCH 219
#define TK_JOIN 220 #define TK_CONTAINS 220
#define TK_INNER 221 #define TK_JOIN 221
#define TK_SELECT 222 #define TK_INNER 222
#define TK_DISTINCT 223 #define TK_SELECT 223
#define TK_WHERE 224 #define TK_DISTINCT 224
#define TK_PARTITION 225 #define TK_WHERE 225
#define TK_BY 226 #define TK_PARTITION 226
#define TK_SESSION 227 #define TK_BY 227
#define TK_STATE_WINDOW 228 #define TK_SESSION 228
#define TK_SLIDING 229 #define TK_STATE_WINDOW 229
#define TK_FILL 230 #define TK_SLIDING 230
#define TK_VALUE 231 #define TK_FILL 231
#define TK_NONE 232 #define TK_VALUE 232
#define TK_PREV 233 #define TK_NONE 233
#define TK_LINEAR 234 #define TK_PREV 234
#define TK_NEXT 235 #define TK_LINEAR 235
#define TK_HAVING 236 #define TK_NEXT 236
#define TK_RANGE 237 #define TK_HAVING 237
#define TK_EVERY 238 #define TK_RANGE 238
#define TK_ORDER 239 #define TK_EVERY 239
#define TK_SLIMIT 240 #define TK_ORDER 240
#define TK_SOFFSET 241 #define TK_SLIMIT 241
#define TK_LIMIT 242 #define TK_SOFFSET 242
#define TK_OFFSET 243 #define TK_LIMIT 243
#define TK_ASC 244 #define TK_OFFSET 244
#define TK_NULLS 245 #define TK_ASC 245
#define TK_ID 246 #define TK_NULLS 246
#define TK_NK_BITNOT 247 #define TK_ID 247
#define TK_VALUES 248 #define TK_NK_BITNOT 248
#define TK_IMPORT 249 #define TK_VALUES 249
#define TK_NK_SEMI 250 #define TK_IMPORT 250
#define TK_FILE 251 #define TK_NK_SEMI 251
#define TK_FILE 252
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -36,6 +36,8 @@ typedef struct SReadHandle { ...@@ -36,6 +36,8 @@ typedef struct SReadHandle {
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
// int8_t initTsdbReader;
bool tqReader; bool tqReader;
} SReadHandle; } SReadHandle;
......
...@@ -97,6 +97,11 @@ typedef struct SAlterDatabaseStmt { ...@@ -97,6 +97,11 @@ typedef struct SAlterDatabaseStmt {
SDatabaseOptions* pOptions; SDatabaseOptions* pOptions;
} SAlterDatabaseStmt; } SAlterDatabaseStmt;
typedef struct SFlushDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
} SFlushDatabaseStmt;
typedef struct STableOptions { typedef struct STableOptions {
ENodeType type; ENodeType type;
bool commentNull; bool commentNull;
......
...@@ -111,6 +111,7 @@ typedef enum ENodeType { ...@@ -111,6 +111,7 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_DATABASE_STMT, QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT, QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT, QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_FLUSH_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT, QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE, QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
QUERY_NODE_CREATE_MULTI_TABLE_STMT, QUERY_NODE_CREATE_MULTI_TABLE_STMT,
......
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include "executor.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "trpc.h" #include "trpc.h"
#include "executor.h"
enum { enum {
NODE_TYPE_VNODE = 1, NODE_TYPE_VNODE = 1,
...@@ -31,13 +31,6 @@ enum { ...@@ -31,13 +31,6 @@ enum {
NODE_TYPE_MNODE, NODE_TYPE_MNODE,
}; };
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
} SDeleteRes;
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum; uint32_t maxSchedulerNum;
uint32_t maxTaskNum; uint32_t maxTaskNum;
...@@ -46,19 +39,19 @@ typedef struct SQWorkerCfg { ...@@ -46,19 +39,19 @@ typedef struct SQWorkerCfg {
typedef struct { typedef struct {
uint64_t cacheDataSize; uint64_t cacheDataSize;
uint64_t queryProcessed; uint64_t queryProcessed;
uint64_t cqueryProcessed; uint64_t cqueryProcessed;
uint64_t fetchProcessed; uint64_t fetchProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
uint64_t deleteProcessed; uint64_t deleteProcessed;
uint64_t numOfQueryInQueue; uint64_t numOfQueryInQueue;
uint64_t numOfFetchInQueue; uint64_t numOfFetchInQueue;
uint64_t timeInQueryQueue; uint64_t timeInQueryQueue;
uint64_t timeInFetchQueue; uint64_t timeInFetchQueue;
uint64_t numOfErrors; uint64_t numOfErrors;
} SQWorkerStat; } SQWorkerStat;
...@@ -82,7 +75,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 ...@@ -82,7 +75,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes); int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt); void qWorkerDestroy(void **qWorkerMgmt);
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_UTIL_MALLOCATOR_H_ #ifndef _TD_UTIL_TREALLOC_H_
#define _TD_UTIL_MALLOCATOR_H_ #define _TD_UTIL_TREALLOC_H_
#include "os.h" #include "os.h"
...@@ -22,34 +22,43 @@ ...@@ -22,34 +22,43 @@
extern "C" { extern "C" {
#endif #endif
// Memory allocator static FORCE_INLINE int32_t tRealloc(uint8_t **ppBuf, int64_t size) {
#define TD_MEM_ALCT(TYPE) \ int32_t code = 0;
struct { \ int64_t bsize = 0;
void *(*malloc_)(struct TYPE *, uint64_t size); \ uint8_t *pBuf;
void (*free_)(struct TYPE *, void *ptr); \
if (*ppBuf) {
bsize = *(int64_t *)((*ppBuf) - sizeof(int64_t));
}
if (bsize >= size) goto _exit;
if (bsize == 0) bsize = 64;
while (bsize < size) {
bsize *= 2;
} }
#define TD_MA_MALLOC_FUNC(TMA) (TMA)->malloc_
#define TD_MA_FREE_FUNC(TMA) (TMA)->free_
#define TD_MA_MALLOC(TMA, SIZE) (*((TMA)->malloc_))(TMA, (SIZE)) pBuf = (uint8_t *)taosMemoryRealloc(*ppBuf ? (*ppBuf) - sizeof(int64_t) : *ppBuf, bsize + sizeof(int64_t));
#define TD_MA_FREE(TMA, PTR) (*((TMA)->free_))(TMA, (PTR)) if (pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
typedef struct SMemAllocator { *(int64_t *)pBuf = bsize;
void *impl; *ppBuf = pBuf + sizeof(int64_t);
TD_MEM_ALCT(SMemAllocator);
} SMemAllocator;
#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE) _exit:
#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR) return code;
}
typedef struct SMemAllocatorFactory { static FORCE_INLINE void tFree(uint8_t *pBuf) {
void *impl; if (pBuf) {
SMemAllocator *(*create)(struct SMemAllocatorFactory *); taosMemoryFree(pBuf - sizeof(int64_t));
void (*destroy)(struct SMemAllocatorFactory *, SMemAllocator *); }
} SMemAllocatorFactory; }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_UTIL_MALLOCATOR_H_*/ #endif /*_TD_UTIL_TREALLOC_H_*/
\ No newline at end of file
...@@ -72,6 +72,7 @@ int32_t* taosGetErrno(); ...@@ -72,6 +72,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -669,13 +669,13 @@ TEST(testCase, projection_query_tables) { ...@@ -669,13 +669,13 @@ TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
...@@ -700,54 +700,55 @@ TEST(testCase, projection_query_tables) { ...@@ -700,54 +700,55 @@ TEST(testCase, projection_query_tables) {
printf("create table :%d\n", i); printf("create table :%d\n", i);
createNewTable(pConn, i); createNewTable(pConn, i);
} }
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// TAOS_ROW pRow = NULL; pRes = taos_query(pConn, "select * from tu");
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); if (taos_errno(pRes) != 0) {
// int32_t numOfFields = taos_num_fields(pRes); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// char str[512] = {0}; ASSERT_TRUE(false);
// while ((pRow = taos_fetch_row(pRes)) != NULL) { }
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
// taos_free_result(pRes); TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
//TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from st1"); pRes = taos_query(pConn, "select ts from st1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); taos_free_result(pRes);
// ASSERT_TRUE(false); ASSERT_TRUE(false);
// } }
//
// TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0}; char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str); printf("%s\n", str);
// } }
//
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
...@@ -773,7 +774,7 @@ TEST(testCase, agg_query_tables) { ...@@ -773,7 +774,7 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
/* /*
--- copy the following script in the shell to setup the environment --- --- copy the following script in the shell to setup the environment ---
...@@ -819,5 +820,27 @@ TEST(testCase, async_api_test) { ...@@ -819,5 +820,27 @@ TEST(testCase, async_api_test) {
getchar(); getchar();
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s", taos_errstr(pRes));
}
taos_free_result(pRes);
char s[256] = {0};
for(int32_t i = 0; i < 7000; ++i) {
sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i);
pRes = taos_query(pConn, s);
taos_free_result(pRes);
}
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -1831,6 +1831,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1831,6 +1831,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
pSubmitBlk->suid = suid; pSubmitBlk->suid = suid;
pSubmitBlk->uid = pDataBlock->info.groupId; pSubmitBlk->uid = pDataBlock->info.groupId;
pSubmitBlk->numOfRows = rows; pSubmitBlk->numOfRows = rows;
pSubmitBlk->sversion = pTSchema->version;
msgLen += sizeof(SSubmitBlk); msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0; int32_t dataLen = 0;
......
...@@ -29,15 +29,9 @@ typedef struct { ...@@ -29,15 +29,9 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW) #define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
// SValue // SValue
static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) { int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t n = 0; int32_t n = 0;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
...@@ -88,7 +82,7 @@ static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) { ...@@ -88,7 +82,7 @@ static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
return n; return n;
} }
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) { int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t n = 0; int32_t n = 0;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
...@@ -421,7 +415,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S ...@@ -421,7 +415,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
_set_none: _set_none:
if ((flags & 0xf0) == 0) { if ((flags & 0xf0) == 0) {
setBitMap(pb, 0, iColumn - 1, flags); setBitMap(pb, 0, iColumn - 1, flags);
if (flags & TSROW_HAS_VAL) { // set 0 if (flags & TSROW_HAS_VAL) { // set 0
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(VarDataOffsetT *)(pf + pTColumn->offset) = 0; *(VarDataOffsetT *)(pf + pTColumn->offset) = 0;
} else { } else {
...@@ -434,7 +428,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S ...@@ -434,7 +428,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
_set_null: _set_null:
if ((flags & 0xf0) == 0) { if ((flags & 0xf0) == 0) {
setBitMap(pb, 1, iColumn - 1, flags); setBitMap(pb, 1, iColumn - 1, flags);
if (flags & TSROW_HAS_VAL) { // set 0 if (flags & TSROW_HAS_VAL) { // set 0
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(VarDataOffsetT *)(pf + pTColumn->offset) = 0; *(VarDataOffsetT *)(pf + pTColumn->offset) = 0;
} else { } else {
...@@ -639,15 +633,15 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal ...@@ -639,15 +633,15 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal
} }
_return_none: _return_none:
*pColVal = COL_VAL_NONE(pTColumn->colId); *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return; return;
_return_null: _return_null:
*pColVal = COL_VAL_NULL(pTColumn->colId); *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return; return;
_return_value: _return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, value); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return; return;
} }
...@@ -1105,9 +1099,9 @@ _err: ...@@ -1105,9 +1099,9 @@ _err:
#if 1 // =================================================================================================================== #if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle); static void dataColSetNEleNull(SDataCol *pCol, int nEle);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints; int spaceNeeded = pCol->bytes * maxPoints;
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
} }
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints); int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
......
...@@ -5445,6 +5445,37 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { ...@@ -5445,6 +5445,37 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);
if (tEncodeU64(pCoder, pRes->suid) < 0) return -1;
if (tEncodeI32v(pCoder, nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tEncodeU64(pCoder, *(uint64_t *)taosArrayGet(pRes->uidList, iUid)) < 0) return -1;
}
if (tEncodeI64(pCoder, pRes->skey) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1;
if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1;
return 0;
}
int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
int32_t nUid;
uint64_t uid;
if (tDecodeU64(pCoder, &pRes->suid) < 0) return -1;
if (tDecodeI32v(pCoder, &nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tDecodeU64(pCoder, &uid) < 0) return -1;
taosArrayPush(pRes->uidList, &uid);
}
if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1;
if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1;
if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1;
return 0;
}
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
......
...@@ -34,6 +34,7 @@ const uint8_t tdVTypeByte[2][3] = {{ ...@@ -34,6 +34,7 @@ const uint8_t tdVTypeByte[2][3] = {{
// declaration // declaration
static uint8_t tdGetBitmapByte(uint8_t byte); static uint8_t tdGetBitmapByte(uint8_t byte);
static int32_t tdCompareColId(const void *arg1, const void *arg2); static int32_t tdCompareColId(const void *arg1, const void *arg2);
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2);
// static void dataColSetNEleNull(SDataCol *pCol, int nEle); // static void dataColSetNEleNull(SDataCol *pCol, int nEle);
...@@ -339,9 +340,9 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType, int8 ...@@ -339,9 +340,9 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType, int8
} }
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode) { bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode) {
int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I); int32_t nBytes = (bitmapMode == 0 ? numOfBits / TD_VTYPE_PARTS : numOfBits / TD_VTYPE_PARTS_I);
uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM]; uint8_t vTypeByte = tdVTypeByte[bitmapMode][TD_VTYPE_NORM];
uint8_t *qBitmap = (uint8_t*)pBitmap; uint8_t *qBitmap = (uint8_t *)pBitmap;
for (int i = 0; i < nBytes; ++i) { for (int i = 0; i < nBytes; ++i) {
if (*qBitmap != vTypeByte) { if (*qBitmap != vTypeByte) {
return false; return false;
...@@ -1045,13 +1046,28 @@ int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode) { ...@@ -1045,13 +1046,28 @@ int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode) {
return result; return result;
} }
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow)); tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
return true; return true;
} }
int16_t nCols = tdRowGetNCols(pRow) - 1;
if (nCols <= 0) {
pVal->valType = TD_VTYPE_NONE;
return true;
}
SKvRowIdx *pColIdx =
(SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), nCols, sizeof(SKvRowIdx), compareKvRowColId, TD_EQ);
if (!pColIdx) {
pVal->valType = TD_VTYPE_NONE;
return true;
}
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx); tdGetKvRowValOfCol(pVal, pRow, pBitmap, pColIdx->offset,
POINTER_DISTANCE(pColIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx));
return true; return true;
} }
...@@ -1204,6 +1220,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2 ...@@ -1204,6 +1220,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2
} }
} }
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
STColumn *pTColumn;
SColVal *pColVal;
int32_t nColVal = taosArrayGetSize(pArray);
int32_t varDataLen = 0;
int32_t maxVarDataLen = 0;
int32_t iColVal = 0;
void *varBuf = NULL;
ASSERT(nColVal > 1);
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
} else {
pColVal = NULL;
}
if (iColumn == 0) {
ASSERT(pColVal->cid == pTColumn->colId);
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
if (pColVal) {
varDataLen += (pColVal->value.nData + sizeof(VarDataLenT));
if (maxVarDataLen < (pColVal->value.nData + sizeof(VarDataLenT))) {
maxVarDataLen = pColVal->value.nData + sizeof(VarDataLenT);
}
} else {
varDataLen += sizeof(VarDataLenT);
if (pTColumn->type == TSDB_DATA_TYPE_VARCHAR) {
varDataLen += CHAR_BYTES;
if (maxVarDataLen < CHAR_BYTES + sizeof(VarDataLenT)) {
maxVarDataLen = CHAR_BYTES + sizeof(VarDataLenT);
}
} else {
varDataLen += INT_BYTES;
if (maxVarDataLen < INT_BYTES + sizeof(VarDataLenT)) {
maxVarDataLen = INT_BYTES + sizeof(VarDataLenT);
}
}
}
}
}
++iColVal;
}
*ppRow = (STSRow *)taosMemoryCalloc(
1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1));
if (!(*ppRow)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (maxVarDataLen > 0) {
varBuf = taosMemoryMalloc(maxVarDataLen);
if (!varBuf) {
taosMemoryFreeClear(*ppRow);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetInfo(&rb, pTSchema->numOfCols, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, *ppRow);
iColVal = 0;
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
TDRowValT valType = TD_VTYPE_NORM;
const void *val = NULL;
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
if (pColVal->isNone) {
valType = TD_VTYPE_NONE;
} else if (pColVal->isNull) {
valType = TD_VTYPE_NULL;
} else if (IS_VAR_DATA_TYPE(pTColumn->type)) {
varDataSetLen(varBuf, pColVal->value.nData);
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
val = varBuf;
} else {
val = (const void *)&pColVal->value.i64;
}
} else {
pColVal = NULL;
valType = TD_VTYPE_NONE;
}
tdAppendColValToRow(&rb, pTColumn->colId, pTColumn->type, valType, val, true, pTColumn->offset, iColVal);
++iColVal;
}
taosMemoryFreeClear(varBuf);
return 0;
}
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) { bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts; pVal->val = &pIter->pRow->ts;
...@@ -1593,7 +1715,6 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou ...@@ -1593,7 +1715,6 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou
} else { } else {
pBuilder->rowType = TD_ROW_TP; pBuilder->rowType = TD_ROW_TP;
} }
pBuilder->flen = flen; pBuilder->flen = flen;
pBuilder->nCols = nCols; pBuilder->nCols = nCols;
pBuilder->nBoundCols = nBoundCols; pBuilder->nBoundCols = nBoundCols;
...@@ -1855,4 +1976,36 @@ void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) { ...@@ -1855,4 +1976,36 @@ void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) { void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) {
pIter->pSchema = pSchema; pIter->pSchema = pSchema;
pIter->maxColId = pSchema->columns[pSchema->numOfCols - 1].colId; pIter->maxColId = pSchema->columns[pSchema->numOfCols - 1].colId;
} }
\ No newline at end of file
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) {
STColumn *pTColumn = &pTSchema->columns[iCol];
SCellVal cv;
SValue value;
ASSERT(iCol > 0);
if (TD_IS_TP_ROW(pRow)) {
tdSTpRowGetVal(pRow, pTColumn->colId, pTColumn->type, pTSchema->flen, pTColumn->offset, iCol - 1, &cv);
} else if (TD_IS_KV_ROW(pRow)) {
ASSERT(iCol > 0);
tdSKvRowGetVal(pRow, pTColumn->colId, iCol - 1, &cv);
} else {
ASSERT(0);
}
if (tdValTypeIsNone(cv.valType)) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
} else if (tdValTypeIsNull(cv.valType)) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
value.nData = varDataLen(cv.val);
value.pData = varDataVal(cv.val);
} else {
tGetValue(cv.val, &value, pTColumn->type);
}
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
}
}
...@@ -350,6 +350,7 @@ SArray *vmGetMsgHandles() { ...@@ -350,6 +350,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
...@@ -44,8 +44,12 @@ target_sources( ...@@ -44,8 +44,12 @@ target_sources(
"src/tsdb/tsdbMemTable.c" "src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbRead.c" "src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbCache.c"
"src/tsdb/tsdbWrite.c" "src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c" "src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
# tq # tq
"src/tq/tq.c" "src/tq/tq.c"
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include "tcommon.h" #include "tcommon.h"
#include "tfs.h" #include "tfs.h"
#include "tmallocator.h"
#include "tmsg.h" #include "tmsg.h"
#include "trow.h" #include "trow.h"
...@@ -70,6 +69,10 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int ...@@ -70,6 +69,10 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
...@@ -110,33 +113,31 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -110,33 +113,31 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb // tsdb
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef void *tsdbReaderT; typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_EXTERN_ORDER 3
int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid); #define LASTROW_RETRIEVE_TYPE_ALL 0x1
int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList); #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
uint64_t taskId); int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
void *pMemRef); const char *idstr);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); void tsdbReaderClose(STsdbReader *pReader);
bool isTsdbCacheLastRow(tsdbReaderT *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
int32_t tsdbGetStbIdList(SMeta *pMeta, int64_t suid, SArray *list); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
void *tsdbGetIdx(SMeta *pMeta); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
void *tsdbGetIvtIdx(SMeta *pMeta); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t *colId, int32_t numOfCols,
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void **pReader);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbLastrowReaderClose(void *pReader);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
// tq // tq
......
...@@ -62,6 +62,7 @@ struct STSmaStat { ...@@ -62,6 +62,7 @@ struct STSmaStat {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t submitVer;
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
void *tmrHandle; // shared by fetch tasks void *tmrHandle; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
...@@ -84,6 +85,7 @@ struct SSmaStat { ...@@ -84,6 +85,7 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId) #define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
enum { enum {
TASK_TRIGGER_STAT_INIT = 0, TASK_TRIGGER_STAT_INIT = 0,
...@@ -208,11 +210,15 @@ struct STFInfo { ...@@ -208,11 +210,15 @@ struct STFInfo {
// specific fields // specific fields
union { union {
struct { struct {
int64_t applyVer[2]; int64_t submitVer;
} qTaskInfo; } qTaskInfo;
}; };
}; };
enum {
TD_FTYPE_RSMA_QTASKINFO = 0,
};
struct STFile { struct STFile {
uint8_t state; uint8_t state;
STFInfo info; STFInfo info;
......
此差异已折叠。
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "filter.h" #include "filter.h"
#include "qworker.h" #include "qworker.h"
#include "sync.h" #include "sync.h"
#include "tRealloc.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcompare.h" #include "tcompare.h"
...@@ -27,15 +28,15 @@ ...@@ -27,15 +28,15 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tdb.h" #include "tdb.h"
#include "tencode.h" #include "tencode.h"
#include "tref.h"
#include "tfs.h" #include "tfs.h"
#include "tglobal.h" #include "tglobal.h"
#include "tjson.h" #include "tjson.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tmallocator.h" #include "tlrucache.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tref.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tstream.h" #include "tstream.h"
#include "ttime.h" #include "ttime.h"
...@@ -94,6 +95,7 @@ int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); ...@@ -94,6 +95,7 @@ int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name); tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int metaGetTbNum(SMeta* pMeta); int metaGetTbNum(SMeta* pMeta);
...@@ -127,9 +129,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub ...@@ -127,9 +129,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
...@@ -157,6 +157,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -157,6 +157,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);
...@@ -171,6 +172,7 @@ int32_t smaPostCommit(SSma* pSma); ...@@ -171,6 +172,7 @@ int32_t smaPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq); int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
...@@ -196,9 +198,9 @@ typedef struct { ...@@ -196,9 +198,9 @@ typedef struct {
// SVState // SVState
struct SVState { struct SVState {
// int64_t processed;
int64_t committed; int64_t committed;
int64_t applied; int64_t applied;
int64_t commitID;
}; };
struct SVnodeInfo { struct SVnodeInfo {
......
...@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) { ...@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
} }
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) { int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
STbDbKey tbDbKey = {.version = version, .uid = uid}; STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db // query table.db
...@@ -54,7 +54,7 @@ _err: ...@@ -54,7 +54,7 @@ _err:
} }
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
int64_t version; int64_t version;
// query uid.idx // query uid.idx
...@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { ...@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
} }
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
tb_uid_t uid; tb_uid_t uid;
// query name.idx // query name.idx
...@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { ...@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
} }
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
void * pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
tb_uid_t uid = 0; tb_uid_t uid = 0;
...@@ -138,7 +138,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { ...@@ -138,7 +138,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
int metaTbCursorNext(SMTbCursor *pTbCur) { int metaTbCursorNext(SMTbCursor *pTbCur) {
int ret; int ret;
void * pBuf; void *pBuf;
STbCfg tbCfg; STbCfg tbCfg;
for (;;) { for (;;) {
...@@ -159,7 +159,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { ...@@ -159,7 +159,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
void * pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int64_t version; int64_t version;
SSchemaWrapper schema = {0}; SSchemaWrapper schema = {0};
...@@ -218,9 +218,9 @@ _err: ...@@ -218,9 +218,9 @@ _err:
return NULL; return NULL;
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC * pCur; TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
...@@ -235,13 +235,13 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -235,13 +235,13 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
} }
void *pKey = NULL; void *pKey = NULL;
int kLen = 0; int kLen = 0;
while(1){ while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL); ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) { if (ret < 0) {
break; break;
} }
ttlKey = *(STtlIdxKey*)pKey; ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid); taosArrayPush(uidList, &ttlKey.uid);
} }
tdbTbcClose(pCur); tdbTbcClose(pCur);
...@@ -252,11 +252,11 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -252,11 +252,11 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
} }
struct SMCtbCursor { struct SMCtbCursor {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t suid; tb_uid_t suid;
void * pKey; void *pKey;
void * pVal; void *pVal;
int kLen; int kLen;
int vLen; int vLen;
}; };
...@@ -388,15 +388,15 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) { ...@@ -388,15 +388,15 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
if (ret < 0) { if (ret < 0) {
return 0; return 0;
} }
return *(tb_uid_t*)pStbCur->pKey; return *(tb_uid_t *)pStbCur->pKey;
} }
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
// SMetaReader mr = {0}; // SMetaReader mr = {0};
STSchema * pTSchema = NULL; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0}; STSchemaBuilder sb = {0};
SSchema * pSchema; SSchema *pSchema;
pSW = metaGetTableSchema(pMeta, uid, sver, 0); pSW = metaGetTableSchema(pMeta, uid, sver, 0);
if (!pSW) return NULL; if (!pSW) return NULL;
...@@ -415,6 +415,51 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -415,6 +415,51 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema; return pTSchema;
} }
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
int32_t code = 0;
STSchema *pTSchema = NULL;
SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
void *pData = NULL;
int nData = 0;
// query
metaRLock(pMeta);
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
code = TSDB_CODE_NOT_FOUND;
metaULock(pMeta);
goto _err;
}
metaULock(pMeta);
// decode
SDecoder dc = {0};
SSchemaWrapper schema;
SSchemaWrapper *pSchemaWrapper = &schema;
tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
tDecoderClear(&dc);
// convert
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
for (int i = 0; i < pSchemaWrapper->nCols; i++) {
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
return code;
_err:
*ppTSchema = NULL;
return code;
}
int metaGetTbNum(SMeta *pMeta) { int metaGetTbNum(SMeta *pMeta) {
// TODO // TODO
// ASSERT(0); // ASSERT(0);
...@@ -422,11 +467,11 @@ int metaGetTbNum(SMeta *pMeta) { ...@@ -422,11 +467,11 @@ int metaGetTbNum(SMeta *pMeta) {
} }
typedef struct { typedef struct {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t uid; tb_uid_t uid;
void * pKey; void *pKey;
void * pVal; void *pVal;
int kLen; int kLen;
int vLen; int vLen;
} SMSmaCursor; } SMSmaCursor;
...@@ -498,7 +543,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) { ...@@ -498,7 +543,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL; STSmaWrapper *pSW = NULL;
SArray * pSmaIds = NULL; SArray *pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) { if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL; return NULL;
...@@ -522,7 +567,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { ...@@ -522,7 +567,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
int64_t smaId; int64_t smaId;
int smaIdx = 0; int smaIdx = 0;
STSma * pTSma = NULL; STSma *pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) { for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) { if (metaGetTableEntryByUid(&mr, smaId) < 0) {
...@@ -570,7 +615,7 @@ _err: ...@@ -570,7 +615,7 @@ _err:
} }
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma * pTSma = NULL; STSma *pTSma = NULL;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) { if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
...@@ -592,7 +637,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { ...@@ -592,7 +637,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
} }
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
SArray * pUids = NULL; SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL; SSmaIdxKey *pSmaIdxKey = NULL;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
...@@ -630,7 +675,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { ...@@ -630,7 +675,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
} }
SArray *metaGetSmaTbUids(SMeta *pMeta) { SArray *metaGetSmaTbUids(SMeta *pMeta) {
SArray * pUids = NULL; SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL; SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0; tb_uid_t lastUid = 0;
...@@ -689,20 +734,20 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) { ...@@ -689,20 +734,20 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
} }
typedef struct { typedef struct {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t suid; tb_uid_t suid;
int16_t cid; int16_t cid;
int16_t type; int16_t type;
void * pKey; void *pKey;
void * pVal; void *pVal;
int32_t kLen; int32_t kLen;
int32_t vLen; int32_t vLen;
} SIdxCursor; } SIdxCursor;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
SIdxCursor *pCursor = NULL; SIdxCursor *pCursor = NULL;
char * buf = NULL; char *buf = NULL;
int32_t maxSize = 0; int32_t maxSize = 0;
int32_t ret = 0, valid = 0; int32_t ret = 0, valid = 0;
...@@ -721,7 +766,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -721,7 +766,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t nKey = 0; int32_t nKey = 0;
int32_t nTagData = 0; int32_t nTagData = 0;
void * tagData = NULL; void *tagData = NULL;
if (param->val == NULL) { if (param->val == NULL) {
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
...@@ -757,7 +802,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -757,7 +802,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END; goto END;
} }
void * entryKey = NULL, *entryVal = NULL; void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal; int32_t nEntryKey, nEntryVal;
bool first = true; bool first = true;
while (1) { while (1) {
......
...@@ -523,6 +523,17 @@ static void tdDestroySDataBlockArray(SArray *pArray) { ...@@ -523,6 +523,17 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
if (level == TSDB_RETENTION_L0) {
return pSma->pVnode->state.applied;
}
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL; SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
...@@ -562,7 +573,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -562,7 +573,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pRSmaInfo->pStat->submitVer, 1), pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
goto _err; goto _err;
} }
...@@ -814,6 +825,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { ...@@ -814,6 +825,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
} }
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0;
if (pVnode->state.committed > 0) { if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode), smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
...@@ -828,6 +840,18 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { ...@@ -828,6 +840,18 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err; goto _err;
} }
STFInfo tFileInfo = {0};
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
goto _err;
}
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
SRSmaQTaskInfoIter fIter = {0}; SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter); tdRSmaQTaskInfoIterDestroy(&fIter);
...@@ -1094,6 +1118,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { ...@@ -1094,6 +1118,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
} }
STFile tFile = {0}; STFile tFile = {0};
if (RSMA_SUBMIT_VER(pRSmaStat) > 0) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true;
}
while (infoHash) { while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
...@@ -1129,12 +1169,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { ...@@ -1129,12 +1169,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err; goto _err;
} }
if (tdCreateTFile(&tFile, true, -1) < 0) { if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err; goto _err;
} }
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
i + 1, TD_TFILE_FULL_NAME(&tFile)); TD_TFILE_FULL_NAME(&tFile));
isFileCreated = true; isFileCreated = true;
} }
...@@ -1161,6 +1201,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { ...@@ -1161,6 +1201,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
} }
if (isFileCreated) { if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer);
if (tdUpdateTFileHeader(&tFile) < 0) { if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno)); tstrerror(terrno));
......
...@@ -32,6 +32,9 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { ...@@ -32,6 +32,9 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->ftype); tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize); tlen += taosEncodeFixedI64(buf, pInfo->fsize);
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
}
return tlen; return tlen;
} }
...@@ -41,6 +44,11 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { ...@@ -41,6 +44,11 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
// specific
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
}
return buf; return buf;
} }
......
...@@ -462,8 +462,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -462,8 +462,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid); tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid); tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
......
...@@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) {
.reader = handle.execHandle.pExecReader[i], .reader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
}; };
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
ASSERT(handle.execHandle.execCol.task[i]); ASSERT(handle.execHandle.execCol.task[i]);
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"
typedef struct SLastrowReader {
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
// int32_t* pSlotIds;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list
} SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
int32_t numOfRows = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SColVal colVal = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
} else {
tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull || colVal.isNone) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
}
}
}
pBlock->info.rows += 1;
}
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->type = type;
p->pVnode = pVnode;
p->numOfCols = numOfCols;
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
p->pTableList = pTableIdList;
for(int32_t i = 0; i < p->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(colId[i])) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
}
}
*pReader = p;
return TSDB_CODE_SUCCESS;
}
int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader;
for (int32_t i = 0; i < p->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]);
}
taosMemoryFree(p->transferBuf);
taosMemoryFree(pReader);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SLastrowReader* pr = pReader;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list.
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (h == NULL) {
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
internalResult = true;
lastKey = pRow->ts;
}
tsdbCacheRelease(lruCache, h);
}
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (h == NULL) {
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
saveOneRow(pRow, pResBlock, pr, slotIds);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
}
}
} else {
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
此差异已折叠。
此差异已折叠。
...@@ -28,7 +28,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -28,7 +28,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
// scan and convert // scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
} }
return -1; return -1;
} }
...@@ -77,7 +77,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro ...@@ -77,7 +77,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
if (rowKey < minKey || rowKey > maxKey) { if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64, " maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey); TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1; return -1;
} }
...@@ -92,7 +92,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -92,7 +92,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSRow *row = NULL; STSRow *row = NULL;
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb); STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days; TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days;
......
...@@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.config = *pCfg; info.config = *pCfg;
info.state.committed = -1; info.state.committed = -1;
info.state.applied = -1; info.state.applied = -1;
info.state.commitID = 0;
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
...@@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->config = info.config; pVnode->config = info.config;
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
pVnode->blockCount = 0; pVnode->blockCount = 0;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit 50b68d85f7cbaf7a9adfa4082e88ca758770f75e Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01
Subproject commit c3815951fc80617ecd171f3743b8b4a4d0bc712e Subproject commit c885e967e490105999b84d009a15168728dfafaf
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册