提交 05a39977 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feat/caseWhen

...@@ -162,17 +162,7 @@ struct STSRowBuilder { ...@@ -162,17 +162,7 @@ struct STSRowBuilder {
struct SValue { struct SValue {
union { union {
int8_t i8; // TSDB_DATA_TYPE_BOOL||TSDB_DATA_TYPE_TINYINT int64_t val;
uint8_t u8; // TSDB_DATA_TYPE_UTINYINT
int16_t i16; // TSDB_DATA_TYPE_SMALLINT
uint16_t u16; // TSDB_DATA_TYPE_USMALLINT
int32_t i32; // TSDB_DATA_TYPE_INT
uint32_t u32; // TSDB_DATA_TYPE_UINT
int64_t i64; // TSDB_DATA_TYPE_BIGINT
uint64_t u64; // TSDB_DATA_TYPE_UBIGINT
TSKEY ts; // TSDB_DATA_TYPE_TIMESTAMP
float f; // TSDB_DATA_TYPE_FLOAT
double d; // TSDB_DATA_TYPE_DOUBLE
struct { struct {
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
......
...@@ -333,10 +333,10 @@ typedef struct tDataTypeDescriptor { ...@@ -333,10 +333,10 @@ typedef struct tDataTypeDescriptor {
char *name; char *name;
int64_t minValue; int64_t minValue;
int64_t maxValue; int64_t maxValue;
int32_t (*compFunc)(const char *const input, int32_t inputSize, const int32_t nelements, char *const output, int32_t (*compFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t outputSize, char algorithm, char *const buffer, int32_t bufferSize); int32_t nBuf);
int32_t (*decompFunc)(const char *const input, int32_t compressedSize, const int32_t nelements, char *const output, int32_t (*decompFunc)(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t outputSize, char algorithm, char *const buffer, int32_t bufferSize); int32_t nBuf);
void (*statisFunc)(int8_t bitmapMode, const void *pBitmap, const void *pData, int32_t numofrow, int64_t *min, void (*statisFunc)(int8_t bitmapMode, const void *pBitmap, const void *pData, int32_t numofrow, int64_t *min,
int64_t *max, int64_t *sum, int16_t *minindex, int16_t *maxindex, int16_t *numofnull); int64_t *max, int64_t *sum, int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor; } tDataTypeDescriptor;
...@@ -356,7 +356,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); ...@@ -356,7 +356,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
void *getDataMin(int32_t type); void *getDataMin(int32_t type);
void *getDataMax(int32_t type); void *getDataMax(int32_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
#endif #endif
#define ENCODE_LIMIT (((uint8_t)1) << 7) #define ENCODE_LIMIT (((uint8_t)1) << 7)
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode #define ZIGZAGE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode #define ZIGZAGD(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
/* ------------------------ LEGACY CODES ------------------------ */ /* ------------------------ LEGACY CODES ------------------------ */
#if 1 #if 1
...@@ -70,7 +70,7 @@ static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) { ...@@ -70,7 +70,7 @@ static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
} }
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
*value = ( (((int8_t *)buf)[0] == 0) ? false : true ); *value = ((((int8_t *)buf)[0] == 0) ? false : true);
return POINTER_SHIFT(buf, sizeof(int8_t)); return POINTER_SHIFT(buf, sizeof(int8_t));
} }
......
...@@ -51,287 +51,12 @@ extern "C" { ...@@ -51,287 +51,12 @@ extern "C" {
#define HEAD_MODE(x) x % 2 #define HEAD_MODE(x) x % 2
#define HEAD_ALGO(x) x / 2 #define HEAD_ALGO(x) x / 2
extern int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type);
extern int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output,
const char type);
extern int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize);
extern int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output,
int32_t outputSize);
extern int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
extern int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output);
// lossy
extern int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output);
extern int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
char *const output);
extern int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output);
extern int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
char *const output);
#ifdef TD_TSZ #ifdef TD_TSZ
extern bool lossyFloat; extern bool lossyFloat;
extern bool lossyDouble; extern bool lossyDouble;
int32_t tsCompressInit(); int32_t tsCompressInit();
void tsCompressExit(); void tsCompressExit();
#endif
static FORCE_INLINE int32_t tsCompressTinyint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressTinyint(const char *const input, int32_t compressedSize,
const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressSmallint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressSmallint(const char *const input, int32_t compressedSize,
const int32_t nelements, char *const output, int32_t outputSize,
char algorithm, char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressInt(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressInt(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressBigint(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressBigint(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressBool(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressBoolImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsDecompressBool(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressBoolImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressBoolImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
}
static FORCE_INLINE int32_t tsCompressString(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
return tsCompressStringImp(input, inputSize, output, outputSize);
}
static FORCE_INLINE int32_t tsDecompressString(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
return tsDecompressStringImp(input, compressedSize, output, outputSize);
}
static FORCE_INLINE int32_t tsCompressFloat(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
#ifdef TD_TSZ
// lossy mode
if (lossyFloat) {
return tsCompressFloatLossyImp(input, nelements, output);
// lossless mode
} else {
#endif
if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsDecompressFloat(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
} else {
#endif
// decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressFloatImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsCompressDouble(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const buffer,
int32_t bufferSize) {
#ifdef TD_TSZ
if (lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(input, nelements, output);
} else {
#endif
// lossless mode
if (algorithm == ONE_STAGE_COMP) {
return tsCompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
int32_t len = tsCompressDoubleImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
static FORCE_INLINE int32_t tsDecompressDouble(const char *const input, int32_t compressedSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) {
#ifdef TD_TSZ
if (HEAD_ALGO(input[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressDoubleLossyImp(input, compressedSize, nelements, output);
} else {
#endif
// decompress lossless
if (algorithm == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1;
return tsDecompressDoubleImp(buffer, nelements, output);
} else {
assert(0);
return -1;
}
#ifdef TD_TSZ
}
#endif
}
#ifdef TD_TSZ
//
// lossy float double
//
static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements, static FORCE_INLINE int32_t tsCompressFloatLossy(const char *const input, int32_t inputSize, const int32_t nelements,
char *const output, int32_t outputSize, char algorithm, char *const output, int32_t outputSize, char algorithm,
char *const buffer, int32_t bufferSize) { char *const buffer, int32_t bufferSize) {
...@@ -358,33 +83,56 @@ static FORCE_INLINE int32_t tsDecompressDoubleLossy(const char *const input, int ...@@ -358,33 +83,56 @@ static FORCE_INLINE int32_t tsDecompressDoubleLossy(const char *const input, int
#endif #endif
static FORCE_INLINE int32_t tsCompressTimestamp(const char *const input, int32_t inputSize, const int32_t nelements, /*************************************************************************
char *const output, int32_t outputSize, char algorithm, * REGULAR COMPRESSION
char *const buffer, int32_t bufferSize) { *************************************************************************/
if (algorithm == ONE_STAGE_COMP) { int32_t tsCompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
return tsCompressTimestampImp(input, nelements, output); int32_t nBuf);
} else if (algorithm == TWO_STAGE_COMP) { int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
int32_t len = tsCompressTimestampImp(input, nelements, buffer); void *pBuf, int32_t nBuf);
return tsCompressStringImp(buffer, len, output, outputSize); int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
} else { int32_t nBuf);
assert(0); int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
return -1; int32_t nBuf);
} int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
} int32_t nBuf);
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
static FORCE_INLINE int32_t tsDecompressTimestamp(const char *const input, int32_t compressedSize, int32_t nBuf);
const int32_t nelements, char *const output, int32_t outputSize, int32_t tsCompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
char algorithm, char *const buffer, int32_t bufferSize) { int32_t nBuf);
if (algorithm == ONE_STAGE_COMP) { int32_t tsDecompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
return tsDecompressTimestampImp(input, nelements, output); int32_t nBuf);
} else if (algorithm == TWO_STAGE_COMP) { int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
if (tsDecompressStringImp(input, compressedSize, buffer, bufferSize) < 0) return -1; int32_t nBuf);
return tsDecompressTimestampImp(buffer, nelements, output); int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
} else { int32_t nBuf);
assert(0); int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
return -1; int32_t nBuf);
} int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
} int32_t nBuf);
int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
/*************************************************************************
* STREAM COMPRESSION
*************************************************************************/
typedef struct SCompressor SCompressor;
int32_t tCompressorCreate(SCompressor **ppCmprsor);
int32_t tCompressorDestroy(SCompressor *pCmprsor);
int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin);
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
...@@ -689,7 +689,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { ...@@ -689,7 +689,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData); memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
val = varBuf; val = varBuf;
} else { } else {
val = (const void *)&pColVal->value.i64; val = (const void *)&pColVal->value.val;
} }
} else { } else {
pColVal = NULL; pColVal = NULL;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <taoserror.h> #include <taoserror.h>
...@@ -476,4 +477,5 @@ TEST(testCase, NoneTest) { ...@@ -476,4 +477,5 @@ TEST(testCase, NoneTest) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
} }
#endif
#endif #endif
\ No newline at end of file
...@@ -42,9 +42,23 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -42,9 +42,23 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (pObj->sql != NULL) {
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; } else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
if (pObj->ast != NULL) {
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
if (pObj->physicalPlan != NULL) {
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
int32_t sz = taosArrayGetSize(pObj->tasks); int32_t sz = taosArrayGetSize(pObj->tasks);
if (tEncodeI32(pEncoder, sz) < 0) return -1; if (tEncodeI32(pEncoder, sz) < 0) return -1;
......
...@@ -554,7 +554,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { ...@@ -554,7 +554,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) { for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
SField *pField1 = taosArrayGet(pCreate->pColumns, i); SField *pField1 = taosArrayGet(pCreate->pColumns, i);
if (pField1->type < 0) { if (pField1->type >= TSDB_DATA_TYPE_MAX) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -570,7 +570,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { ...@@ -570,7 +570,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for (int32_t i = 0; i < pCreate->numOfTags; ++i) { for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
SField *pField1 = taosArrayGet(pCreate->pTags, i); SField *pField1 = taosArrayGet(pCreate->pTags, i);
if (pField1->type < 0) { if (pField1->type >= TSDB_DATA_TYPE_MAX) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -982,8 +982,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { ...@@ -982,8 +982,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
} else { } else {
mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid", createReq.name, mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid, origin tagVer:%d colVer:%d",
createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER; terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
goto _OVER; goto _OVER;
} }
...@@ -1603,9 +1603,9 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa ...@@ -1603,9 +1603,9 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return -1; return -1;
} }
strcpy(pRsp->dbFName, pStb->db); tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
strcpy(pRsp->tbName, tbName); tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
strcpy(pRsp->stbName, tbName); tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
pRsp->dbId = pDb->uid; pRsp->dbId = pDb->uid;
pRsp->numOfTags = pStb->numOfTags; pRsp->numOfTags = pStb->numOfTags;
pRsp->numOfColumns = pStb->numOfColumns; pRsp->numOfColumns = pStb->numOfColumns;
...@@ -1649,9 +1649,9 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, ...@@ -1649,9 +1649,9 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
return -1; return -1;
} }
strcpy(pRsp->dbFName, pStb->db); tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
strcpy(pRsp->tbName, tbName); tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
strcpy(pRsp->stbName, tbName); tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
pRsp->numOfTags = pStb->numOfTags; pRsp->numOfTags = pStb->numOfTags;
pRsp->numOfColumns = pStb->numOfColumns; pRsp->numOfColumns = pStb->numOfColumns;
pRsp->tableType = TSDB_SUPER_TABLE; pRsp->tableType = TSDB_SUPER_TABLE;
...@@ -2551,7 +2551,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc ...@@ -2551,7 +2551,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false); colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
char rollup[128 + VARSTR_HEADER_SIZE] = {0}; char rollup[160 + VARSTR_HEADER_SIZE] = {0};
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
for (int32_t i = 0; i < rollupNum; ++i) { for (int32_t i = 0; i < rollupNum; ++i) {
char *funcName = taosArrayGet(pStb->pFuncs, i); char *funcName = taosArrayGet(pStb->pFuncs, i);
......
...@@ -425,8 +425,10 @@ static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStream ...@@ -425,8 +425,10 @@ static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStream
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
streamObj.status = STREAM_STATUS__RECOVER; streamObj.status = STREAM_STATUS__RECOVER;
SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj); SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -771,12 +773,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -771,12 +773,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1; return -1;
} }
// drop stream // drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1; return -1;
} }
...@@ -945,10 +949,8 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -945,10 +949,8 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SName n; SName n;
int32_t cols = 0; int32_t cols = 0;
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->name, T_NAME_ACCT | T_NAME_DB); STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
tNameGetDbName(&n, varDataVal(streamName));
varDataSetLen(streamName, strlen(varDataVal(streamName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
...@@ -956,28 +958,24 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -956,28 +958,24 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(&sql[VARSTR_HEADER_SIZE], pStream->sql, TSDB_SHOW_SQL_LEN); STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false); colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
mndShowStreamStatus(&status[VARSTR_HEADER_SIZE], pStream); char status2[20] = {0};
varDataSetLen(status, strlen(varDataVal(status))); mndShowStreamStatus(status2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&status, false); colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB); STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
tNameGetDbName(&n, varDataVal(sourceDB));
varDataSetLen(sourceDB, strlen(varDataVal(sourceDB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&sourceDB, false); colDataAppend(pColInfo, numOfRows, (const char *)&sourceDB, false);
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->targetDb, T_NAME_ACCT | T_NAME_DB); STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
tNameGetDbName(&n, varDataVal(targetDB));
varDataSetLen(targetDB, strlen(varDataVal(targetDB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&targetDB, false); colDataAppend(pColInfo, numOfRows, (const char *)&targetDB, false);
...@@ -986,9 +984,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -986,9 +984,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, NULL, true); colDataAppend(pColInfo, numOfRows, NULL, true);
} else { } else {
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->targetSTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
strcpy(&targetSTB[VARSTR_HEADER_SIZE], tNameGetTableName(&n));
varDataSetLen(targetSTB, strlen(varDataVal(targetSTB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false); colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false);
} }
...@@ -997,8 +993,9 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -997,8 +993,9 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0}; char trigger[20 + VARSTR_HEADER_SIZE] = {0};
mndShowStreamTrigger(&trigger[VARSTR_HEADER_SIZE], pStream); char trigger2[20] = {0};
varDataSetLen(trigger, strlen(varDataVal(trigger))); mndShowStreamTrigger(trigger2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&trigger, false); colDataAppend(pColInfo, numOfRows, (const char *)&trigger, false);
......
...@@ -96,7 +96,7 @@ void metaReaderClear(SMetaReader *pReader); ...@@ -96,7 +96,7 @@ void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMetaReader *pReader, const char *name); int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags); int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaGetTableTagsOpt(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags); int32_t metaGetTableTagsByUids(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
int32_t metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
...@@ -159,7 +159,7 @@ uint64_t getReaderMaxVersion(STsdbReader *pReader); ...@@ -159,7 +159,7 @@ uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void* tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
......
...@@ -38,39 +38,43 @@ extern "C" { ...@@ -38,39 +38,43 @@ extern "C" {
goto LABEL; \ goto LABEL; \
} }
typedef struct TSDBROW TSDBROW; typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID; typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY; typedef struct TSDBKEY TSDBKEY;
typedef struct SDelData SDelData; typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx; typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData; typedef struct STbData STbData;
typedef struct SMemTable SMemTable; typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter; typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData; typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx; typedef struct SBlockIdx SBlockIdx;
typedef struct SDataBlk SDataBlk; typedef struct SDataBlk SDataBlk;
typedef struct SSttBlk SSttBlk; typedef struct SSttBlk SSttBlk;
typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData; typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile; typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile; typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile; typedef struct SDataFile SDataFile;
typedef struct SSttFile SSttFile; typedef struct SSttFile SSttFile;
typedef struct SSmaFile SSmaFile; typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet; typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter; typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader; typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter; typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader; typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter; typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger; typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap; typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo; typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo; typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange; typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter; typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SBlkInfo SBlkInfo;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
...@@ -170,7 +174,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut ...@@ -170,7 +174,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut
int32_t aBufN[]); int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]); int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
// SDiskDataHdr // SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph); int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph); int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
...@@ -267,6 +271,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p ...@@ -267,6 +271,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast); int8_t cmprAlg, int8_t toLast);
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader // SDataFReader
...@@ -300,7 +305,7 @@ int32_t tsdbMerge(STsdb *pTsdb); ...@@ -300,7 +305,7 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) #define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) #define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache // tsdbCache ==============================================================================================
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
...@@ -318,6 +323,15 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode); ...@@ -318,6 +323,15 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg,
uint8_t calcSma);
int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo);
// structs ======================= // structs =======================
struct STsdbFS { struct STsdbFS {
SDelFile *pDelFile; SDelFile *pDelFile;
...@@ -438,6 +452,17 @@ struct SSmaInfo { ...@@ -438,6 +452,17 @@ struct SSmaInfo {
int32_t size; int32_t size;
}; };
struct SBlkInfo {
int64_t minUid;
int64_t maxUid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
TSDBKEY minTKey;
TSDBKEY maxTKey;
};
struct SDataBlk { struct SDataBlk {
TSDBKEY minKey; TSDBKEY minKey;
TSDBKEY maxKey; TSDBKEY maxKey;
...@@ -661,6 +686,38 @@ typedef struct { ...@@ -661,6 +686,38 @@ typedef struct {
STSchema *pTSchema; STSchema *pTSchema;
} SSkmInfo; } SSkmInfo;
struct SDiskCol {
SBlockCol bCol;
const uint8_t *pBit;
const uint8_t *pOff;
const uint8_t *pVal;
SColumnDataAgg agg;
};
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol>
};
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
uint8_t calcSma;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nBuilder;
SArray *aBuilder; // SArray<SDiskColBuilder>
uint8_t *aBuf[2];
SDiskData dd;
SBlkInfo bi;
};
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr); bool destroyLoadInfo, const char *idStr);
......
...@@ -207,17 +207,17 @@ int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) { ...@@ -207,17 +207,17 @@ int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0); metaReaderInit(&mr, (SMeta *)meta, 0);
SMeta *pMeta = mr.pMeta;
SMetaReader *pReader = &mr; SMetaReader *pReader = &mr;
// query name.idx // query name.idx
if (tdbTbGet(pMeta->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { if (tdbTbGet(pReader->pMeta->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} }
*uid = *(tb_uid_t *)pReader->pBuf; *uid = *(tb_uid_t *)pReader->pBuf;
metaReaderClear(&mr); metaReaderClear(&mr);
return 0; return 0;
...@@ -228,11 +228,11 @@ int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) { ...@@ -228,11 +228,11 @@ int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0); metaReaderInit(&mr, (SMeta *)meta, 0);
if (metaGetTableEntryByName(&mr, tbName) == 0) { code = metaGetTableEntryByName(&mr, tbName);
*tbType = mr.me.type; if (code == 0) *tbType = mr.me.type;
}
metaReaderClear(&mr); metaReaderClear(&mr);
return 0; return code;
} }
int metaReadNext(SMetaReader *pReader) { int metaReadNext(SMetaReader *pReader) {
...@@ -1134,21 +1134,49 @@ END: ...@@ -1134,21 +1134,49 @@ END:
return ret; return ret;
} }
int32_t metaGetTableTagsOpt(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) { static int32_t metaGetTableTagByUid(SMeta *pMeta, uint64_t suid, uint64_t uid, void **tag, int32_t *len, bool lock) {
int ret = 0;
if (lock) {
metaRLock(pMeta);
}
SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = uid};
ret = tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), tag, len);
if (lock) {
metaULock(pMeta);
}
return ret;
}
int32_t metaGetTableTagsByUids(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
const int32_t LIMIT = 128;
int32_t isLock = false;
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0; int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
tb_uid_t *id = taosArrayGet(uidList, i); tb_uid_t *id = taosArrayGet(uidList, i);
SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = *id};
if (i % LIMIT == 0) {
void *val = NULL; if (isLock) metaULock(pMeta);
int32_t len = 0;
if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL && metaRLock(pMeta);
0 == tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), &val, &len)) { isLock = true;
taosHashPut(tags, id, sizeof(tb_uid_t), val, len); }
if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
void *val = NULL;
int32_t len = 0;
if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
tdbFree(val);
}
} }
} }
if (isLock) metaULock(pMeta);
return 0; return 0;
} }
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) { int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid); SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid);
......
...@@ -260,7 +260,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb ...@@ -260,7 +260,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) { if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = keyTs}); SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal}); taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
} }
...@@ -447,7 +447,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -447,7 +447,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else { } else {
// tMergeTreeClose(&state->mergeTree); tMergeTreeClose(&state->mergeTree);
*ppRow = NULL; *ppRow = NULL;
return code; return code;
...@@ -463,7 +463,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -463,7 +463,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
// tMergeTreeClose(&state->mergeTree); tMergeTreeClose(&state->mergeTree);
goto _next_fileset; goto _next_fileset;
} }
state->state = SFSLASTNEXTROW_BLOCKROW; state->state = SFSLASTNEXTROW_BLOCKROW;
...@@ -590,7 +590,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -590,7 +590,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
goto _next_fileset; goto _next_fileset;
} }
tMapDataReset(&state->blockMap); if (state->blockMap.pData != NULL) {
tMapDataClear(&state->blockMap);
}
code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap); code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err; if (code) goto _err;
...@@ -695,6 +698,10 @@ int32_t clearNextRowFromFS(void *iter) { ...@@ -695,6 +698,10 @@ int32_t clearNextRowFromFS(void *iter) {
state->pBlockData = NULL; state->pBlockData = NULL;
} }
if (state->blockMap.pData != NULL) {
tMapDataClear(&state->blockMap);
}
return code; return code;
} }
...@@ -1052,7 +1059,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1052,7 +1059,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
lastRowTs = TSDBROW_TS(pRow); lastRowTs = TSDBROW_TS(pRow);
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs}); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, pColVal) == NULL) { if (taosArrayPush(pColArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -1151,7 +1158,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1151,7 +1158,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
lastRowTs = rowTs; lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs}); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -926,6 +926,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -926,6 +926,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
} }
} }
// fill the mis-matched columns with null value
while (i < numOfOutputCols) { while (i < numOfOutputCols) {
pColData = taosArrayGet(pResBlock->pDataBlock, i); pColData = taosArrayGet(pResBlock->pDataBlock, i);
colDataAppendNNULL(pColData, 0, remain); colDataAppendNNULL(pColData, 0, remain);
...@@ -935,12 +936,15 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -935,12 +936,15 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
pResBlock->info.rows = remain; pResBlock->info.rows = remain;
pDumpInfo->rowIndex += step * remain; pDumpInfo->rowIndex += step * remain;
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) { if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
// int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
// setBlockAllDumped(pDumpInfo, ts, pReader->order); if (outOfTimeWindow(ts, &pReader->window)) { // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->order);
}
} else { } else {
int64_t k = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
setBlockAllDumped(pDumpInfo, k, pReader->order); setBlockAllDumped(pDumpInfo, ts, pReader->order);
} }
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
......
...@@ -128,7 +128,7 @@ _exit: ...@@ -128,7 +128,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
int32_t code = 0; int32_t code = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
...@@ -522,9 +522,6 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, ...@@ -522,9 +522,6 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
// write // write
if (pSmaInfo->size) { if (pSmaInfo->size) {
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size); code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err; if (code) goto _err;
...@@ -607,6 +604,132 @@ _err: ...@@ -607,6 +604,132 @@ _err:
return code; return code;
} }
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo) {
int32_t code = 0;
int32_t lino = 0;
STsdbFD *pFD = NULL;
if (pSmaInfo) {
pFD = pWriter->pDataFD;
pBlkInfo->offset = pWriter->fData.size;
} else {
pFD = pWriter->pSttFD;
pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
}
pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0;
// hdr
int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
code = tRealloc(&pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);
code = tsdbWriteFile(pFD, pBlkInfo->offset, pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += n;
pBlkInfo->szBlock += n;
// uid + ver + key
if (pDiskData->pUid) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pUid, pDiskData->hdr.szUid);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szUid;
pBlkInfo->szBlock += pDiskData->hdr.szUid;
}
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pVer, pDiskData->hdr.szVer);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szVer;
pBlkInfo->szBlock += pDiskData->hdr.szVer;
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pKey, pDiskData->hdr.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szKey += pDiskData->hdr.szKey;
pBlkInfo->szBlock += pDiskData->hdr.szKey;
// aBlockCol
if (pDiskData->hdr.szBlkCol) {
code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
n = 0;
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
n += tPutBlockCol(pWriter->aBuf[0] + n, pDiskCol);
}
ASSERT(n == pDiskData->hdr.szBlkCol);
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskData->hdr.szBlkCol;
}
// aDiskCol
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (pDiskCol->pBit) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szBitmap;
}
if (pDiskCol->pOff) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pOff, pDiskCol->bCol.szOffset);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szOffset;
}
if (pDiskCol->pVal) {
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pVal, pDiskCol->bCol.szValue);
TSDB_CHECK_CODE(code, lino, _exit);
pBlkInfo->szBlock += pDiskCol->bCol.szValue;
}
}
if (pSmaInfo) {
pWriter->fData.size += pBlkInfo->szBlock;
} else {
pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
goto _exit;
}
pSmaInfo->offset = 0;
pSmaInfo->size = 0;
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (IS_VAR_DATA_TYPE(pDiskCol->bCol.type)) continue;
if (pDiskCol->bCol.flag == HAS_NULL || pDiskCol->bCol.flag == (HAS_NULL | HAS_NONE)) continue;
if (!pDiskCol->bCol.smaOn) continue;
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &pDiskCol->agg));
TSDB_CHECK_CODE(code, lino, _exit);
pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &pDiskCol->agg);
}
if (pSmaInfo->size) {
pSmaInfo->offset = pWriter->fSma.size;
code = tsdbWriteFile(pWriter->pSmaFD, pSmaInfo->offset, pWriter->aBuf[0], pSmaInfo->size);
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->fSma.size += pSmaInfo->size;
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t code = 0; int32_t code = 0;
int64_t n; int64_t n;
...@@ -946,8 +1069,8 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo ...@@ -946,8 +1069,8 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
ASSERT(hdr.delimiter == TSDB_FILE_DLMT); ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(pBlockData->suid == hdr.suid); ASSERT(pBlockData->suid == hdr.suid);
ASSERT(pBlockData->uid == hdr.uid);
pBlockData->uid = hdr.uid;
pBlockData->nRow = hdr.nRow; pBlockData->nRow = hdr.nRow;
// uid // uid
......
...@@ -649,7 +649,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo ...@@ -649,7 +649,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -690,7 +690,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -690,7 +690,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
STColumn *pTColumn; STColumn *pTColumn;
int32_t iCol, jCol = 1; int32_t iCol, jCol = 1;
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) { for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol]; pTColumn = &pMerger->pTSchema->columns[iCol];
...@@ -744,7 +744,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -744,7 +744,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -770,7 +770,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -770,7 +770,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
...@@ -1105,9 +1105,8 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1105,9 +1105,8 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
pColVal = tRowIterNext(&rIter); pColVal = tRowIterNext(&rIter);
} }
} }
_exit:
pBlockData->nRow++; pBlockData->nRow++;
return code; return code;
_err: _err:
...@@ -1455,9 +1454,8 @@ _exit: ...@@ -1455,9 +1454,8 @@ _exit:
} }
// SDiskDataHdr ============================== // SDiskDataHdr ==============================
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) { int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) {
int32_t n = 0; int32_t n = 0;
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
n += tPutU32(p ? p + n : p, pHdr->delimiter); n += tPutU32(p ? p + n : p, pHdr->delimiter);
n += tPutU32v(p ? p + n : p, pHdr->fmtVer); n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
...@@ -1516,174 +1514,107 @@ int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) { ...@@ -1516,174 +1514,107 @@ int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
return n; return n;
} }
#define SMA_UPDATE(SUM_V, MIN_V, MAX_V, VAL, MINSET, MAXSET) \
do { \
(SUM_V) += (VAL); \
if (!(MINSET)) { \
(MIN_V) = (VAL); \
(MINSET) = 1; \
} else if ((MIN_V) > (VAL)) { \
(MIN_V) = (VAL); \
} \
if (!(MAXSET)) { \
(MAX_V) = (VAL); \
(MAXSET) = 1; \
} else if ((MAX_V) < (VAL)) { \
(MAX_V) = (VAL); \
} \
} while (0)
static FORCE_INLINE void tSmaUpdateBool(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int8_t val = *(int8_t *)&pColVal->value.val ? 1 : 0;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateTinyint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
int8_t val = *(int8_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateSmallint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
int16_t val = *(int16_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateInt(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int32_t val = *(int32_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateBigint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
int64_t val = *(int64_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateFloat(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
float val = *(float *)&pColVal->value.val;
SMA_UPDATE(*(double *)&pColAgg->sum, *(double *)&pColAgg->min, *(double *)&pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateDouble(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
double val = *(double *)&pColVal->value.val;
SMA_UPDATE(*(double *)&pColAgg->sum, *(double *)&pColAgg->min, *(double *)&pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUTinyint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint8_t val = *(uint8_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUSmallint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint16_t val = *(uint16_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUInt(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) {
uint32_t val = *(uint32_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
static FORCE_INLINE void tSmaUpdateUBigint(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet,
uint8_t *maxSet) {
uint64_t val = *(uint64_t *)&pColVal->value.val;
SMA_UPDATE(pColAgg->sum, pColAgg->min, pColAgg->max, val, *minSet, *maxSet);
}
void (*tSmaUpdateImpl[])(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet) = {
NULL,
tSmaUpdateBool, // TSDB_DATA_TYPE_BOOL
tSmaUpdateTinyint, // TSDB_DATA_TYPE_TINYINT
tSmaUpdateSmallint, // TSDB_DATA_TYPE_SMALLINT
tSmaUpdateInt, // TSDB_DATA_TYPE_INT
tSmaUpdateBigint, // TSDB_DATA_TYPE_BIGINT
tSmaUpdateFloat, // TSDB_DATA_TYPE_FLOAT
tSmaUpdateDouble, // TSDB_DATA_TYPE_DOUBLE
NULL, // TSDB_DATA_TYPE_VARCHAR
tSmaUpdateBigint, // TSDB_DATA_TYPE_TIMESTAMP
NULL, // TSDB_DATA_TYPE_NCHAR
tSmaUpdateUTinyint, // TSDB_DATA_TYPE_UTINYINT
tSmaUpdateUSmallint, // TSDB_DATA_TYPE_USMALLINT
tSmaUpdateUInt, // TSDB_DATA_TYPE_UINT
tSmaUpdateUBigint, // TSDB_DATA_TYPE_UBIGINT
NULL, // TSDB_DATA_TYPE_JSON
NULL, // TSDB_DATA_TYPE_VARBINARY
NULL, // TSDB_DATA_TYPE_DECIMAL
NULL, // TSDB_DATA_TYPE_BLOB
NULL, // TSDB_DATA_TYPE_MEDIUMBLOB
};
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
SColVal *pColVal = &colVal;
memset(pColAgg, 0, sizeof(*pColAgg));
bool minAssigned = false;
bool maxAssigned = false;
*pColAgg = (SColumnDataAgg){.colId = pColData->cid}; *pColAgg = (SColumnDataAgg){.colId = pColData->cid};
uint8_t minSet = 0;
uint8_t maxSet = 0;
SColVal cv;
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
tColDataGetValue(pColData, iVal, pColVal); tColDataGetValue(pColData, iVal, &cv);
if (!COL_VAL_IS_VALUE(pColVal)) { if (COL_VAL_IS_VALUE(&cv)) {
pColAgg->numOfNull++; tSmaUpdateImpl[pColData->type](pColAgg, &cv, &minSet, &maxSet);
} else { } else {
switch (pColData->type) { pColAgg->numOfNull++;
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
break;
case TSDB_DATA_TYPE_TINYINT: {
pColAgg->sum += colVal.value.i8;
if (!minAssigned || pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pColAgg->sum += colVal.value.i16;
if (!minAssigned || pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (!minAssigned || pColAgg->min > colVal.value.i32) {
pColAgg->min = colVal.value.i32;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i32) {
pColAgg->max = colVal.value.i32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
pColAgg->sum += colVal.value.i64;
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double *)(&pColAgg->sum) += colVal.value.f;
if (!minAssigned || *(double *)(&pColAgg->min) > colVal.value.f) {
*(double *)(&pColAgg->min) = colVal.value.f;
minAssigned = true;
}
if (!maxAssigned || *(double *)(&pColAgg->max) < colVal.value.f) {
*(double *)(&pColAgg->max) = colVal.value.f;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double *)(&pColAgg->sum) += colVal.value.d;
if (!minAssigned || *(double *)(&pColAgg->min) > colVal.value.d) {
*(double *)(&pColAgg->min) = colVal.value.d;
minAssigned = true;
}
if (!maxAssigned || *(double *)(&pColAgg->max) < colVal.value.d) {
*(double *)(&pColAgg->max) = colVal.value.d;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_VARCHAR:
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
break;
case TSDB_DATA_TYPE_UTINYINT: {
pColAgg->sum += colVal.value.u8;
if (!minAssigned || pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
pColAgg->sum += colVal.value.u16;
if (!minAssigned || pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UINT: {
pColAgg->sum += colVal.value.u32;
if (!minAssigned || pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
minAssigned = true;
}
if (!minAssigned || pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
pColAgg->sum += colVal.value.u64;
if (!minAssigned || pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
minAssigned = true;
}
if (!maxAssigned || pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_JSON:
break;
case TSDB_DATA_TYPE_VARBINARY:
break;
case TSDB_DATA_TYPE_DECIMAL:
break;
case TSDB_DATA_TYPE_BLOB:
break;
case TSDB_DATA_TYPE_MEDIUMBLOB:
break;
default:
ASSERT(0);
}
} }
} }
} }
......
...@@ -328,14 +328,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -328,14 +328,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) { if (!syncEnvIsStart()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
} }
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId); vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -394,7 +394,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -394,7 +394,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg rsp = {.code = code, .info = pMsg->info}; SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1; code = -1;
} }
...@@ -459,7 +459,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -459,7 +459,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg rsp = {.code = code, .info = pMsg->info}; SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1; code = -1;
} }
} }
...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void ...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId); vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
break; break;
} else { } else {
vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId); vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
taosMsleep(10); taosMsleep(10);
} }
} while (true); } while (true);
...@@ -683,7 +683,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { ...@@ -683,7 +683,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId); vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
break; break;
} else { } else {
vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId); vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
taosMsleep(10); taosMsleep(10);
} }
} while (true); } while (true);
......
...@@ -420,7 +420,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray ...@@ -420,7 +420,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
goto end; goto end;
} }
} else { } else {
metaGetTableTagsOpt(metaHandle, suid, uidList, tags); metaGetTableTagsByUids(metaHandle, suid, uidList, tags);
qInfo("succ to get table from meta idx, suid:%" PRIu64, suid); qInfo("succ to get table from meta idx, suid:%" PRIu64, suid);
} }
......
...@@ -41,9 +41,9 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S ...@@ -41,9 +41,9 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname, static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
const char* tableName, int32_t* pNumOfRows, SMetaReader* smrChildTable, const char* dbname, const char* tableName,
const SSDataBlock* dataBlock); int32_t* pNumOfRows, const SSDataBlock* dataBlock);
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock); static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) { bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
...@@ -2415,11 +2415,10 @@ static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) { ...@@ -2415,11 +2415,10 @@ static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) {
strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 && strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 &&
nodeType(node->pRight) == QUERY_NODE_VALUE) { nodeType(node->pRight) == QUERY_NODE_VALUE) {
SValueNode* pValue = (SValueNode*)node->pRight; SValueNode* pValue = (SValueNode*)node->pRight;
if (pValue->node.type == TSDB_DATA_TYPE_NCHAR || pValue->node.type == TSDB_DATA_TYPE_VARCHAR || if (pValue->node.resType.type == TSDB_DATA_TYPE_NCHAR || pValue->node.resType.type == TSDB_DATA_TYPE_VARCHAR ||
pValue->node.type == TSDB_DATA_TYPE_BINARY) { pValue->node.resType.type == TSDB_DATA_TYPE_BINARY) {
char* value = nodesGetStrValueFromNode(pValue); char* value = nodesGetValueFromNode(pValue);
strncpy(condTable, value, TSDB_TABLE_NAME_LEN); strncpy(condTable, varDataVal(value), TSDB_TABLE_NAME_LEN);
taosMemoryFree(value);
return true; return true;
} }
} }
...@@ -2480,18 +2479,28 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2480,18 +2479,28 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, condTableName); STR_TO_VARSTR(tableName, condTableName);
SMetaReader smr = {0}; SMetaReader smrChildTable = {0};
metaReaderInit(&smr, pInfo->readHandle.meta, 0); metaReaderInit(&smrChildTable, pInfo->readHandle.meta, 0);
metaGetTableEntryByName(&smr, condTableName); metaGetTableEntryByName(&smrChildTable, condTableName);
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock); if (smrChildTable.me.type != TSDB_CHILD_TABLE) {
metaReaderClear(&smr); metaReaderClear(&smrChildTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
SMetaReader smrSuperTable = {0};
metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows, dataBlock);
metaReaderClear(&smrSuperTable);
metaReaderClear(&smrChildTable);
if (numOfRows > 0) { if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
numOfRows = 0; numOfRows = 0;
} }
blockDataDestroy(dataBlock); blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
...@@ -2508,23 +2517,22 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2508,23 +2517,22 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
SMetaReader smr = {0}; SMetaReader smrSuperTable = {0};
metaReaderInit(&smr, pInfo->readHandle.meta, 0); metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&smr, suid); int32_t code = metaGetTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
metaReaderClear(&smr); metaReaderClear(&smrSuperTable);
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock); sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
metaReaderClear(&smr); metaReaderClear(&smrSuperTable);
if (numOfRows >= pOperator->resultInfo.capacity) { if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
...@@ -2562,15 +2570,15 @@ static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t ...@@ -2562,15 +2570,15 @@ static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t
blockDataCleanup(dataBlock); blockDataCleanup(dataBlock);
} }
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname, static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
const char* tableName, int32_t* pNumOfRows, SMetaReader* smrChildTable, const char* dbname, const char* tableName,
const SSDataBlock* dataBlock) { int32_t* pNumOfRows, const SSDataBlock* dataBlock) {
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(stableName, (*smr).me.name); STR_TO_VARSTR(stableName, (*smrSuperTable).me.name);
int32_t numOfRows = *pNumOfRows; int32_t numOfRows = *pNumOfRows;
int32_t numOfTags = (*smr).me.stbEntry.schemaTag.nCols; int32_t numOfTags = (*smrSuperTable).me.stbEntry.schemaTag.nCols;
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
SColumnInfoData* pColInfoData = NULL; SColumnInfoData* pColInfoData = NULL;
...@@ -2588,35 +2596,35 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, ...@@ -2588,35 +2596,35 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
// tag name // tag name
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tagName, (*smr).me.stbEntry.schemaTag.pSchema[i].name); STR_TO_VARSTR(tagName, (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].name);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3); pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, tagName, false); colDataAppend(pColInfoData, numOfRows, tagName, false);
// tag type // tag type
int8_t tagType = (*smr).me.stbEntry.schemaTag.pSchema[i].type; int8_t tagType = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].type;
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4); pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
char tagTypeStr[VARSTR_HEADER_SIZE + 32]; char tagTypeStr[VARSTR_HEADER_SIZE + 32];
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
if (tagType == TSDB_DATA_TYPE_VARCHAR) { if (tagType == TSDB_DATA_TYPE_VARCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); (int32_t)((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (tagType == TSDB_DATA_TYPE_NCHAR) { } else if (tagType == TSDB_DATA_TYPE_NCHAR) {
tagTypeLen += tagTypeLen += sprintf(
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)(((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); (int32_t)(((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(tagTypeStr, tagTypeLen); varDataSetLen(tagTypeStr, tagTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = (*smr).me.stbEntry.schemaTag.pSchema[i].colId; tagVal.cid = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].colId;
char* tagData = NULL; char* tagData = NULL;
uint32_t tagLen = 0; uint32_t tagLen = 0;
if (tagType == TSDB_DATA_TYPE_JSON) { if (tagType == TSDB_DATA_TYPE_JSON) {
tagData = (char*)pInfo->pCur->mr.me.ctbEntry.pTags; tagData = (char*)smrChildTable->me.ctbEntry.pTags;
} else { } else {
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal); bool exist = tTagGet((STag*)smrChildTable->me.ctbEntry.pTags, &tagVal);
if (exist) { if (exist) {
if (IS_VAR_DATA_TYPE(tagType)) { if (IS_VAR_DATA_TYPE(tagType)) {
tagData = (char*)tagVal.pData; tagData = (char*)tagVal.pData;
......
...@@ -1402,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 ...@@ -1402,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.skey <= endTsCols[i]) { do {
uint64_t winGpId = pGpDatas[i]; uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
...@@ -1413,7 +1413,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 ...@@ -1413,7 +1413,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
} }
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
} } while (win.ekey <= endTsCols[i]);
} }
} }
...@@ -3067,8 +3067,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -3067,8 +3067,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
int32_t startPos = 0; int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = STimeWindow nextWin = {0};
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); if (IS_FINAL_OP(pInfo)) {
nextWin = getFinalTimeWindow(ts, &pInfo->interval);
} else {
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
}
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
...@@ -3122,8 +3126,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -3122,8 +3126,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, if (IS_FINAL_OP(pInfo)) {
TSDB_ORDER_ASC); forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
} }
......
...@@ -468,6 +468,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP ...@@ -468,6 +468,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype); indexMultiTermQueryAdd(mtm, tm, qtype);
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result); ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
indexMultiTermQueryDestroy(mtm);
} else { } else {
bool reverse; bool reverse;
FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse); FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse);
...@@ -647,9 +648,8 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { ...@@ -647,9 +648,8 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIF_ERR_RET(sifInitOperParams(&params, node, ctx)); SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
if (params[0].status == SFLT_NOT_INDEX && (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) { if (params[0].status == SFLT_NOT_INDEX && (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
for (int i = 0; i < nParam; i++) sifFreeParam(&params[i]);
output->status = SFLT_NOT_INDEX; output->status = SFLT_NOT_INDEX;
return code; goto _return;
} }
// ugly code, refactor later // ugly code, refactor later
......
...@@ -1025,6 +1025,7 @@ void fstDestroy(Fst* fst) { ...@@ -1025,6 +1025,7 @@ void fstDestroy(Fst* fst) {
} }
bool fstGet(Fst* fst, FstSlice* b, Output* out) { bool fstGet(Fst* fst, FstSlice* b, Output* out) {
int ret = false;
FstNode* root = fstGetRoot(fst); FstNode* root = fstGetRoot(fst);
Output tOut = 0; Output tOut = 0;
int32_t len; int32_t len;
...@@ -1037,7 +1038,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { ...@@ -1037,7 +1038,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
uint8_t inp = data[i]; uint8_t inp = data[i];
Output res = 0; Output res = 0;
if (false == fstNodeFindInput(root, inp, &res)) { if (false == fstNodeFindInput(root, inp, &res)) {
return false; goto _return;
} }
FstTransition trn; FstTransition trn;
...@@ -1047,18 +1048,20 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { ...@@ -1047,18 +1048,20 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
taosArrayPush(nodes, &root); taosArrayPush(nodes, &root);
} }
if (!FST_NODE_IS_FINAL(root)) { if (!FST_NODE_IS_FINAL(root)) {
return false; goto _return;
} else { } else {
tOut = tOut + FST_NODE_FINAL_OUTPUT(root); tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
ret = true;
} }
_return:
for (int32_t i = 0; i < taosArrayGetSize(nodes); i++) { for (int32_t i = 0; i < taosArrayGetSize(nodes); i++) {
FstNode** node = (FstNode**)taosArrayGet(nodes, i); FstNode** node = (FstNode**)taosArrayGet(nodes, i);
fstNodeDestroy(*node); fstNodeDestroy(*node);
} }
taosArrayDestroy(nodes); taosArrayDestroy(nodes);
*out = tOut; *out = tOut;
return true; return ret;
} }
FStmBuilder* fstSearch(Fst* fst, FAutoCtx* ctx) { FStmBuilder* fstSearch(Fst* fst, FAutoCtx* ctx) {
// refactor later // refactor later
...@@ -1243,6 +1246,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { ...@@ -1243,6 +1246,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
taosArrayPush(sws->stack, &s); taosArrayPush(sws->stack, &s);
taosMemoryFree(trans);
return true; return true;
} }
} }
......
...@@ -139,6 +139,7 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r ...@@ -139,6 +139,7 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
} }
} }
if (taosArrayGetSize(tinsts) == 0) { if (taosArrayGetSize(tinsts) == 0) {
taosArrayDestroy(tinsts);
return false; return false;
} }
uint32_t *v = taosHashGet(builder->cache, &tinsts, sizeof(POINTER_BYTES)); uint32_t *v = taosHashGet(builder->cache, &tinsts, sizeof(POINTER_BYTES));
......
...@@ -267,10 +267,12 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { ...@@ -267,10 +267,12 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i); uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
if (ret != 0) { if (ret != 0) {
taosArrayDestroy(offsets);
indexError("failed to find target tablelist"); indexError("failed to find target tablelist");
return TSDB_CODE_TDB_FILE_CORRUPTED; return TSDB_CODE_TDB_FILE_CORRUPTED;
} }
} }
taosArrayDestroy(offsets);
return 0; return 0;
} }
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
...@@ -336,6 +338,7 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, ...@@ -336,6 +338,7 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr,
} }
stmStDestroy(st); stmStDestroy(st);
stmBuilderDestroy(sb); stmBuilderDestroy(sb);
taosArrayDestroy(offsets);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
...@@ -379,6 +382,7 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { ...@@ -379,6 +382,7 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
", size: %d, time cost: %" PRIu64 "us", ", size: %d, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost); tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
} }
taosMemoryFree(p);
fstSliceDestroy(&key); fstSliceDestroy(&key);
return 0; return 0;
} }
...@@ -471,6 +475,9 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -471,6 +475,9 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
} }
stmStDestroy(st); stmStDestroy(st);
stmBuilderDestroy(sb); stmBuilderDestroy(sb);
taosArrayDestroy(offsets);
taosMemoryFree(p);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
...@@ -898,9 +905,8 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -898,9 +905,8 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t ts = taosGetTimestampUs(); int64_t ts = taosGetTimestampUs();
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts; int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64 indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
"us", reader->header.fstOffset, fstSize, ctx->file.buf, size, cost);
nread, reader->header.fstOffset, fstSize, ctx->file.buf, size, cost);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize); assert(nread > 0 && nread <= fstSize);
...@@ -989,6 +995,7 @@ static SArray* tfileGetFileList(const char* path) { ...@@ -989,6 +995,7 @@ static SArray* tfileGetFileList(const char* path) {
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (NULL == pDir) { if (NULL == pDir) {
taosArrayDestroy(files);
return NULL; return NULL;
} }
TdDirEntryPtr pDirEntry; TdDirEntryPtr pDirEntry;
......
...@@ -91,6 +91,7 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta); ...@@ -91,6 +91,7 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
......
...@@ -125,6 +125,23 @@ static bool needGetTableIndex(SNode* pStmt) { ...@@ -125,6 +125,23 @@ static bool needGetTableIndex(SNode* pStmt) {
return false; return false;
} }
static int32_t collectMetaKeyFromInsTagsImpl(SCollectMetaKeyCxt* pCxt, SName* pName) {
if (TSDB_DB_NAME_T == pName->type) {
return reserveDbVgInfoInCache(pName->acctId, pName->dbname, pCxt->pMetaCache);
}
return reserveTableVgroupInCacheExt(pName, pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromInsTags(SCollectMetaKeyCxt* pCxt) {
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pStmt;
SName name = {0};
int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &name);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromInsTagsImpl(pCxt, &name);
}
return code;
}
static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const char* pDb, const char* pTable, static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const char* pDb, const char* pTable,
AUTH_TYPE authType) { AUTH_TYPE authType) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pDb, pTable, pCxt->pMetaCache); int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pDb, pTable, pCxt->pMetaCache);
...@@ -143,6 +160,10 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c ...@@ -143,6 +160,10 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES))) { if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES))) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache); code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
} }
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) &&
QUERY_NODE_SELECT_STMT == nodeType(pCxt->pStmt)) {
code = collectMetaKeyFromInsTags(pCxt);
}
return code; return code;
} }
......
...@@ -2181,7 +2181,11 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge ...@@ -2181,7 +2181,11 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
} }
if (TSDB_DB_NAME_T == pTargetName->type) { if (TSDB_DB_NAME_T == pTargetName->type) {
return getDBVgInfoImpl(pCxt, pTargetName, pVgroupList); int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList);
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS;
}
return code;
} }
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
...@@ -2190,86 +2194,22 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge ...@@ -2190,86 +2194,22 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
*pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == *pVgroupList) { if (NULL == *pVgroupList) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} else {
taosArrayPush(*pVgroupList, &vgInfo);
} }
} } else if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
if (TSDB_CODE_SUCCESS == code) { code = TSDB_CODE_SUCCESS;
taosArrayPush(*pVgroupList, &vgInfo);
} }
return code; return code;
} }
static int32_t getTagsTableTargetNameFromOp(STranslateContext* pCxt, SOperatorNode* pOper, SName* pName) {
if (OP_TYPE_EQUAL != pOper->opType) {
return TSDB_CODE_SUCCESS;
}
SColumnNode* pCol = NULL;
SValueNode* pVal = NULL;
if (QUERY_NODE_COLUMN == nodeType(pOper->pLeft)) {
pCol = (SColumnNode*)pOper->pLeft;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pLeft)) {
pVal = (SValueNode*)pOper->pLeft;
}
if (QUERY_NODE_COLUMN == nodeType(pOper->pRight)) {
pCol = (SColumnNode*)pOper->pRight;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight;
}
if (NULL == pCol || NULL == pVal) {
return TSDB_CODE_SUCCESS;
}
if (0 == strcmp(pCol->colName, "db_name")) {
return tNameSetDbName(pName, pCxt->pParseCxt->acctId, pVal->literal, strlen(pVal->literal));
} else if (0 == strcmp(pCol->colName, "table_name")) {
return tNameAddTbName(pName, pVal->literal, strlen(pVal->literal));
}
return TSDB_CODE_SUCCESS;
}
static void getTagsTableTargetObjName(STranslateContext* pCxt, SNode* pNode, SName* pName) {
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
getTagsTableTargetNameFromOp(pCxt, (SOperatorNode*)pNode, pName);
}
}
static int32_t getTagsTableTargetNameFromCond(STranslateContext* pCxt, SLogicConditionNode* pCond, SName* pName) {
if (LOGIC_COND_TYPE_AND != pCond->condType) {
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
FOREACH(pNode, pCond->pParameterList) { getTagsTableTargetObjName(pCxt, pNode, pName); }
if ('\0' == pName->dbname[0]) {
pName->type = 0;
}
return TSDB_CODE_SUCCESS;
}
static int32_t getTagsTableTargetName(STranslateContext* pCxt, SNode* pWhere, SName* pName) {
if (NULL == pWhere) {
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_OPERATOR == nodeType(pWhere)) {
return getTagsTableTargetNameFromOp(pCxt, (SOperatorNode*)pWhere, pName);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pWhere)) {
return getTagsTableTargetNameFromCond(pCxt, (SLogicConditionNode*)pWhere, pName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t getTagsTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgroupList) { static int32_t getTagsTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgroupList) {
if (!isSelectStmt(pCxt->pCurrStmt)) { if (!isSelectStmt(pCxt->pCurrStmt)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SName targetName = {0}; SName targetName = {0};
int32_t code = getTagsTableTargetName(pCxt, pSelect->pWhere, &targetName); int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &targetName);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getTagsTableVgroupListImpl(pCxt, &targetName, pName, pVgroupList); code = getTagsTableVgroupListImpl(pCxt, &targetName, pName, pVgroupList);
} }
......
...@@ -420,6 +420,75 @@ end: ...@@ -420,6 +420,75 @@ end:
return retCode; return retCode;
} }
static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pOper, SName* pName) {
if (OP_TYPE_EQUAL != pOper->opType) {
return TSDB_CODE_SUCCESS;
}
SColumnNode* pCol = NULL;
SValueNode* pVal = NULL;
if (QUERY_NODE_COLUMN == nodeType(pOper->pLeft)) {
pCol = (SColumnNode*)pOper->pLeft;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pLeft)) {
pVal = (SValueNode*)pOper->pLeft;
}
if (QUERY_NODE_COLUMN == nodeType(pOper->pRight)) {
pCol = (SColumnNode*)pOper->pRight;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight;
}
if (NULL == pCol || NULL == pVal) {
return TSDB_CODE_SUCCESS;
}
if (0 == strcmp(pCol->colName, "db_name")) {
return tNameSetDbName(pName, acctId, pVal->literal, strlen(pVal->literal));
} else if (0 == strcmp(pCol->colName, "table_name")) {
return tNameAddTbName(pName, pVal->literal, strlen(pVal->literal));
}
return TSDB_CODE_SUCCESS;
}
static void getInsTagsTableTargetObjName(int32_t acctId, SNode* pNode, SName* pName) {
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
getInsTagsTableTargetNameFromOp(acctId, (SOperatorNode*)pNode, pName);
}
}
static int32_t getInsTagsTableTargetNameFromCond(int32_t acctId, SLogicConditionNode* pCond, SName* pName) {
if (LOGIC_COND_TYPE_AND != pCond->condType) {
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
FOREACH(pNode, pCond->pParameterList) { getInsTagsTableTargetObjName(acctId, pNode, pName); }
if ('\0' == pName->dbname[0]) {
pName->type = 0;
}
return TSDB_CODE_SUCCESS;
}
int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) {
if (NULL == pWhere) {
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_OPERATOR == nodeType(pWhere)) {
int32_t code = getInsTagsTableTargetNameFromOp(acctId, (SOperatorNode*)pWhere, pName);
if (TSDB_CODE_SUCCESS == code && '\0' == pName->dbname[0]) {
pName->type = 0;
}
return code;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pWhere)) {
return getInsTagsTableTargetNameFromCond(acctId, (SLogicConditionNode*)pWhere, pName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) { static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) {
return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type); return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type);
} }
...@@ -1173,7 +1242,7 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { ...@@ -1173,7 +1242,7 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
taosArrayDestroy(p->pTableVgroupReq); taosArrayDestroy(p->pTableVgroupReq);
p = taosHashIterate(pMetaCache->pInsertTables, p); p = taosHashIterate(pMetaCache->pInsertTables, p);
} }
taosHashCleanup(pMetaCache->pInsertTables); taosHashCleanup(pMetaCache->pInsertTables);
taosHashCleanup(pMetaCache->pDbVgroup); taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pDbCfg); taosHashCleanup(pMetaCache->pDbCfg);
......
...@@ -420,9 +420,11 @@ TEST_F(ParserSelectTest, setOperatorSemanticCheck) { ...@@ -420,9 +420,11 @@ TEST_F(ParserSelectTest, setOperatorSemanticCheck) {
} }
TEST_F(ParserSelectTest, informationSchema) { TEST_F(ParserSelectTest, informationSchema) {
useDb("root", "test"); useDb("root", "information_schema");
run("SELECT * FROM ins_databases WHERE name = 'information_schema'");
run("SELECT * FROM information_schema.ins_databases WHERE name = 'information_schema'"); run("SELECT * FROM ins_tags WHERE db_name = 'test' and table_name = 'st1'");
} }
TEST_F(ParserSelectTest, withoutFrom) { TEST_F(ParserSelectTest, withoutFrom) {
......
...@@ -53,7 +53,7 @@ class ParserEnv : public testing::Environment { ...@@ -53,7 +53,7 @@ class ParserEnv : public testing::Environment {
private: private:
void initLog(const char* path) { void initLog(const char* path) {
int32_t logLevel = getLogLevel(); int32_t logLevel = getLogLevel() | DEBUG_SCREEN;
dDebugFlag = logLevel; dDebugFlag = logLevel;
vDebugFlag = logLevel; vDebugFlag = logLevel;
mDebugFlag = logLevel; mDebugFlag = logLevel;
......
...@@ -48,7 +48,7 @@ class PlannerEnv : public testing::Environment { ...@@ -48,7 +48,7 @@ class PlannerEnv : public testing::Environment {
private: private:
void initLog(const char* path) { void initLog(const char* path) {
int32_t logLevel = getLogLevel(); int32_t logLevel = getLogLevel() | DEBUG_SCREEN;
dDebugFlag = logLevel; dDebugFlag = logLevel;
vDebugFlag = logLevel; vDebugFlag = logLevel;
mDebugFlag = logLevel; mDebugFlag = logLevel;
......
...@@ -1347,7 +1347,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1347,7 +1347,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);
} else { } else {
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) { if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn)); tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn), pConn);
} }
} }
addConnToPool(pThrd->pool, pConn); addConnToPool(pThrd->pool, pConn);
......
...@@ -509,7 +509,7 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) { ...@@ -509,7 +509,7 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
if (heapSize(queue->heap) != 0) { if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap); HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return; if (minNode == NULL) return;
uint64_t now = taosGetTimestampMs(); uint64_t now = taosGetTimestampMs();
SDelayTask* task = container_of(minNode, SDelayTask, node); SDelayTask* task = container_of(minNode, SDelayTask, node);
......
...@@ -284,7 +284,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -284,7 +284,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
return; return;
} else { } else {
tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:", transLabel(pTransInst), tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pTransInst),
conn, conn->dst, conn->src); conn, conn->dst, conn->src);
destroyConn(conn, true); destroyConn(conn, true);
return; return;
...@@ -952,10 +952,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -952,10 +952,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
#ifdef WINDOWS #ifdef WINDOWS
char pipeName[64]; char pipeName[64];
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-" PRIu64, taosSafeRand(), GetCurrentProcessId()); snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
#else #else
char pipeName[PATH_MAX] = {0}; char pipeName[PATH_MAX] = {0};
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
taosGetSelfPthreadId()); taosGetSelfPthreadId());
#endif #endif
ret = uv_pipe_bind(&srv->pipeListen, pipeName); ret = uv_pipe_bind(&srv->pipeListen, pipeName);
......
...@@ -801,7 +801,6 @@ TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { ...@@ -801,7 +801,6 @@ TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
if (taosKeepTcpAlive(pSocket) < 0) { if (taosKeepTcpAlive(pSocket) < 0) {
// printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno)); // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL; return NULL;
} }
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册