提交 26adff1f 编写于 作者: S Shengliang Guan

minor changes

上级 4fe44df5
...@@ -43,50 +43,46 @@ ...@@ -43,50 +43,46 @@
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight * An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
* tomorrow - (allowable under ISO 8601) is supported. * tomorrow - (allowable under ISO 8601) is supported.
*/ */
static int64_t user_mktime64(const unsigned int year0, const unsigned int mon0, static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const uint32_t day, const uint32_t hour,
const unsigned int day, const unsigned int hour, const uint32_t min, const uint32_t sec, int64_t time_zone) {
const unsigned int min, const unsigned int sec, int64_t time_zone) uint32_t mon = mon0, year = year0;
{
unsigned int mon = mon0, year = year0;
/* 1..12 -> 11,12,1..10 */ /* 1..12 -> 11,12,1..10 */
if (0 >= (int) (mon -= 2)) { if (0 >= (int32_t)(mon -= 2)) {
mon += 12; /* Puts Feb last since it has leap day */ mon += 12; /* Puts Feb last since it has leap day */
year -= 1; year -= 1;
} }
//int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) + // int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec); // year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
int64_t res; int64_t res;
res = 367*((int64_t)mon)/12; res = 367 * ((int64_t)mon) / 12;
res += year/4 - year/100 + year/400 + day + ((int64_t)year)*365 - 719499; res += year / 4 - year / 100 + year / 400 + day + ((int64_t)year) * 365 - 719499;
res = res*24; res = res * 24;
res = ((res + hour) * 60 + min) * 60 + sec; res = ((res + hour) * 60 + min) * 60 + sec;
return (res + time_zone); return (res + time_zone);
} }
// ==== mktime() kernel code =================// // ==== mktime() kernel code =================//
static int64_t m_deltaUtc = 0; static int64_t m_deltaUtc = 0;
void deltaToUtcInitOnce() { void deltaToUtcInitOnce() {
struct tm tm = {0}; struct tm tm = {0};
(void)strptime("1970-01-01 00:00:00", (const char *)("%Y-%m-%d %H:%M:%S"), &tm); (void)strptime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)mktime(&tm); m_deltaUtc = (int64_t)mktime(&tm);
//printf("====delta:%lld\n\n", seconds); // printf("====delta:%lld\n\n", seconds);
} }
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
static char* forwardToTimeStringEnd(char* str); static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(const char *str, int32_t len); static bool checkTzPresent(const char* str, int32_t len);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = { static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime,
parseLocaltime, parseLocaltimeDst};
parseLocaltimeDst
};
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
...@@ -103,9 +99,9 @@ bool checkTzPresent(const char* str, int32_t len) { ...@@ -103,9 +99,9 @@ bool checkTzPresent(const char* str, int32_t len) {
char* seg = forwardToTimeStringEnd((char*)str); char* seg = forwardToTimeStringEnd((char*)str);
int32_t seg_len = len - (int32_t)(seg - str); int32_t seg_len = len - (int32_t)(seg - str);
char *c = &seg[seg_len - 1]; char* c = &seg[seg_len - 1];
for (int i = 0; i < seg_len; ++i) { for (int32_t i = 0; i < seg_len; ++i) {
if (*c == 'Z' || *c == 'z' || *c == '+' || *c == '-') { if (*c == 'Z' || *c == 'z' || *c == '+' || *c == '-') {
return true; return true;
} }
c--; c--;
...@@ -199,13 +195,12 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { ...@@ -199,13 +195,12 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
i += 2; i += 2;
} }
//return error if there're illegal charaters after min(2 Digits) // return error if there're illegal charaters after min(2 Digits)
char *minStr = &str[i]; char* minStr = &str[i];
if (minStr[1] != '\0' && minStr[2] != '\0') { if (minStr[1] != '\0' && minStr[2] != '\0') {
return -1; return -1;
} }
int64_t minute = strnatoi(&str[i], 2); int64_t minute = strnatoi(&str[i], 2);
if (minute > 59) { if (minute > 59) {
return -1; return -1;
...@@ -233,9 +228,8 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { ...@@ -233,9 +228,8 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
* 2013-04-12T15:52:01.123+0800 * 2013-04-12T15:52:01.123+0800
*/ */
int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) { int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) {
int64_t factor =
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
int64_t tzOffset = 0; int64_t tzOffset = 0;
struct tm tm = {0}; struct tm tm = {0};
...@@ -255,8 +249,8 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch ...@@ -255,8 +249,8 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch
/* mktime will be affected by TZ, set by using taos_options */ /* mktime will be affected by TZ, set by using taos_options */
#ifdef WINDOWS #ifdef WINDOWS
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, 0); int64_t seconds = user_mktime64(tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
//int64_t seconds = gmtime(&tm); // int64_t seconds = gmtime(&tm);
#else #else
int64_t seconds = timegm(&tm); int64_t seconds = timegm(&tm);
#endif #endif
...@@ -320,8 +314,9 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -320,8 +314,9 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
#endif #endif
#endif #endif
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, timezone); int64_t seconds =
user_mktime64(tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, timezone);
int64_t fraction = 0; int64_t fraction = 0;
if (*str == '.') { if (*str == '.') {
...@@ -331,8 +326,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -331,8 +326,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
} }
} }
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : int64_t factor =
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
*time = factor * seconds + fraction; *time = factor * seconds + fraction;
return 0; return 0;
...@@ -350,7 +345,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -350,7 +345,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
/* mktime will be affected by TZ, set by using taos_options */ /* mktime will be affected by TZ, set by using taos_options */
int64_t seconds = mktime(&tm); int64_t seconds = mktime(&tm);
int64_t fraction = 0; int64_t fraction = 0;
if (*str == '.') { if (*str == '.') {
...@@ -360,27 +355,22 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -360,27 +355,22 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
} }
} }
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : int64_t factor =
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
*time = factor * seconds + fraction; *time = factor * seconds + fraction;
return 0; return 0;
} }
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO); fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO); toPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3][3] = { {1., 1000., 1000000.}, static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
{1.0 / 1000, 1., 1000.},
{1.0 / 1000000, 1.0 / 1000, 1.} };
return (int64_t)((double)time * factors[fromPrecision][toPrecision]); return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
} }
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
switch (unit) { switch (unit) {
case 's': case 's':
(*result) = convertTimePrecision(val * MILLISECOND_PER_SECOND, TSDB_TIME_PRECISION_MILLI, timePrecision); (*result) = convertTimePrecision(val * MILLISECOND_PER_SECOND, TSDB_TIME_PRECISION_MILLI, timePrecision);
...@@ -427,7 +417,8 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time ...@@ -427,7 +417,8 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time
* d - Days (24 hours) * d - Days (24 hours)
* w - Weeks (7 days) * w - Weeks (7 days)
*/ */
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) { int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit,
int32_t timePrecision) {
errno = 0; errno = 0;
char* endPtr = NULL; char* endPtr = NULL;
...@@ -474,9 +465,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { ...@@ -474,9 +465,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
} }
struct tm tm; struct tm tm;
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
localtime_r(&tt, &tm); localtime_r(&tt, &tm);
int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration; int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)duration;
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
...@@ -497,13 +488,13 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char ...@@ -497,13 +488,13 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
struct tm tm; struct tm tm;
time_t t = (time_t)skey; time_t t = (time_t)skey;
localtime_r(&t, &tm); localtime_r(&t, &tm);
int smon = tm.tm_year * 12 + tm.tm_mon; int32_t smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey; t = (time_t)ekey;
localtime_r(&t, &tm); localtime_r(&t, &tm);
int emon = tm.tm_year * 12 + tm.tm_mon; int32_t emon = tm.tm_year * 12 + tm.tm_mon;
if (unit == 'y') { if (unit == 'y') {
interval *= 12; interval *= 12;
...@@ -522,7 +513,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -522,7 +513,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') { if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') {
start /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); start /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
struct tm tm; struct tm tm;
time_t tt = (time_t)start; time_t tt = (time_t)start;
localtime_r(&tt, &tm); localtime_r(&tt, &tm);
tm.tm_sec = 0; tm.tm_sec = 0;
tm.tm_min = 0; tm.tm_min = 0;
...@@ -531,10 +522,10 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -531,10 +522,10 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if (pInterval->slidingUnit == 'y') { if (pInterval->slidingUnit == 'y') {
tm.tm_mon = 0; tm.tm_mon = 0;
tm.tm_year = (int)(tm.tm_year / pInterval->sliding * pInterval->sliding); tm.tm_year = (int32_t)(tm.tm_year / pInterval->sliding * pInterval->sliding);
} else { } else {
int mon = tm.tm_year * 12 + tm.tm_mon; int32_t mon = tm.tm_year * 12 + tm.tm_mon;
mon = (int)(mon / pInterval->sliding * pInterval->sliding); mon = (int32_t)(mon / pInterval->sliding * pInterval->sliding);
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
} }
...@@ -547,17 +538,17 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -547,17 +538,17 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
start = (delta / pInterval->sliding + factor) * pInterval->sliding; start = (delta / pInterval->sliding + factor) * pInterval->sliding;
if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') { if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') {
/* /*
* here we revised the start time of day according to the local time zone, * here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided. * but in case of DST, the start time of one day need to be dynamically decided.
*/ */
// todo refactor to extract function that is available for Linux/Windows/Mac platform // todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900 #if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone; int64_t timezone = _timezone;
int32_t daylight = _daylight; int32_t daylight = _daylight;
char** tzname = _tzname; char** tzname = _tzname;
#endif #endif
start += (int64_t)(timezone * TSDB_TICK_PER_SECOND(precision)); start += (int64_t)(timezone * TSDB_TICK_PER_SECOND(precision));
} }
...@@ -568,7 +559,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -568,7 +559,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) { if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
end = start + pInterval->interval - 1; end = start + pInterval->interval - 1;
while(end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window while (end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window
start += pInterval->sliding; start += pInterval->sliding;
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) { if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
...@@ -601,8 +592,8 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -601,8 +592,8 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
// and the parameter can also be a variable. // and the parameter can also be a variable.
const char* fmtts(int64_t ts) { const char* fmtts(int64_t ts) {
static char buf[96]; static char buf[96];
size_t pos = 0; size_t pos = 0;
struct tm tm; struct tm tm;
if (ts > -62135625943 && ts < 32503651200) { if (ts > -62135625943 && ts < 32503651200) {
time_t t = (time_t)ts; time_t t = (time_t)ts;
...@@ -619,7 +610,7 @@ const char* fmtts(int64_t ts) { ...@@ -619,7 +610,7 @@ const char* fmtts(int64_t ts) {
buf[pos++] = ' '; buf[pos++] = ' ';
} }
pos += strftime(buf + pos, sizeof(buf), "ms=%Y-%m-%d %H:%M:%S", &tm); pos += strftime(buf + pos, sizeof(buf), "ms=%Y-%m-%d %H:%M:%S", &tm);
pos += sprintf(buf + pos, ".%03d", (int)(ts % 1000)); pos += sprintf(buf + pos, ".%03d", (int32_t)(ts % 1000));
} }
{ {
...@@ -631,7 +622,7 @@ const char* fmtts(int64_t ts) { ...@@ -631,7 +622,7 @@ const char* fmtts(int64_t ts) {
buf[pos++] = ' '; buf[pos++] = ' ';
} }
pos += strftime(buf + pos, sizeof(buf), "us=%Y-%m-%d %H:%M:%S", &tm); pos += strftime(buf + pos, sizeof(buf), "us=%Y-%m-%d %H:%M:%S", &tm);
pos += sprintf(buf + pos, ".%06d", (int)(ts % 1000000)); pos += sprintf(buf + pos, ".%06d", (int32_t)(ts % 1000000));
} }
return buf; return buf;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "tcompression.h" #include "tcompression.h"
static int32_t getDataStartOffset(); static int32_t getDataStartOffset();
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo); static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
...@@ -36,7 +36,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { ...@@ -36,7 +36,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
} }
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path); taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
// pTSBuf->pFile = fopen(pTSBuf->path, "wb+"); // pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
...@@ -48,20 +48,20 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { ...@@ -48,20 +48,20 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
if (!autoDelete) { if (!autoDelete) {
remove(pTSBuf->path); remove(pTSBuf->path);
} }
if (NULL == allocResForTSBuf(pTSBuf)) { if (NULL == allocResForTSBuf(pTSBuf)) {
return NULL; return NULL;
} }
// update the header info // update the header info
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC}; STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
pTSBuf->tsOrder = order; pTSBuf->tsOrder = order;
return pTSBuf; return pTSBuf;
} }
...@@ -72,23 +72,23 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -72,23 +72,23 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
} }
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path)); tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
// pTSBuf->pFile = fopen(pTSBuf->path, "rb+"); // pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ); pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ);
if (pTSBuf->pFile == NULL) { if (pTSBuf->pFile == NULL) {
free(pTSBuf); free(pTSBuf);
return NULL; return NULL;
} }
if (allocResForTSBuf(pTSBuf) == NULL) { if (allocResForTSBuf(pTSBuf) == NULL) {
return NULL; return NULL;
} }
// validate the file magic number // validate the file magic number
STSBufFileHeader header = {0}; STSBufFileHeader header = {0};
int32_t ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); int32_t ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
UNUSED(ret); UNUSED(ret);
size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader)); size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader));
UNUSED(sz); UNUSED(sz);
...@@ -98,7 +98,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -98,7 +98,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
if (header.numOfGroup > pTSBuf->numOfAlloc) { if (header.numOfGroup > pTSBuf->numOfAlloc) {
pTSBuf->numOfAlloc = header.numOfGroup; pTSBuf->numOfAlloc = header.numOfGroup;
STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc); STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
...@@ -106,57 +106,58 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -106,57 +106,58 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->pData = tmp; pTSBuf->pData = tmp;
} }
pTSBuf->numOfGroups = header.numOfGroup; pTSBuf->numOfGroups = header.numOfGroup;
// check the ts order // check the ts order
pTSBuf->tsOrder = header.tsOrder; pTSBuf->tsOrder = header.tsOrder;
if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder); // tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups; size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize); STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize);
if (buf == NULL) { if (buf == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
//int64_t pos = ftell(pTSBuf->pFile); //pos not used // int64_t pos = ftell(pTSBuf->pFile); //pos not used
sz = taosReadFile(pTSBuf->pFile, buf, infoSize); sz = taosReadFile(pTSBuf->pFile, buf, infoSize);
UNUSED(sz); UNUSED(sz);
// the length value for each vnode is not kept in file, so does not set the length value // the length value for each vnode is not kept in file, so does not set the length value
for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) { for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i]; STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo)); memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
} }
free(buf); free(buf);
ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END); ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END);
UNUSED(ret); UNUSED(ret);
int64_t file_size; int64_t file_size;
if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) { if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->fileSize = (uint32_t)file_size; pTSBuf->fileSize = (uint32_t)file_size;
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
// ascending by default // ascending by default
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->pFile), // tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete); // fileno(pTSBuf->pFile),
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
return pTSBuf; return pTSBuf;
} }
...@@ -164,22 +165,22 @@ void* tsBufDestroy(STSBuf* pTSBuf) { ...@@ -164,22 +165,22 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return NULL; return NULL;
} }
tfree(pTSBuf->assistBuf); tfree(pTSBuf->assistBuf);
tfree(pTSBuf->tsData.rawBuf); tfree(pTSBuf->tsData.rawBuf);
tfree(pTSBuf->pData); tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload); tfree(pTSBuf->block.payload);
if (!pTSBuf->remainOpen) { if (!pTSBuf->remainOpen) {
taosCloseFile(&pTSBuf->pFile); taosCloseFile(&pTSBuf->pFile);
} }
if (pTSBuf->autoDelete) { if (pTSBuf->autoDelete) {
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
remove(pTSBuf->path); remove(pTSBuf->path);
} else { } else {
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
} }
taosVariantDestroy(&pTSBuf->block.tag); taosVariantDestroy(&pTSBuf->block.tag);
...@@ -189,7 +190,7 @@ void* tsBufDestroy(STSBuf* pTSBuf) { ...@@ -189,7 +190,7 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) { static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfGroups - 1; int32_t last = pTSBuf->numOfGroups - 1;
assert(last >= 0); assert(last >= 0);
return &pTSBuf->pData[last]; return &pTSBuf->pData[last];
} }
...@@ -198,40 +199,40 @@ static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) { ...@@ -198,40 +199,40 @@ static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) { if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
assert((int32_t)newSize > pTSBuf->numOfAlloc); assert((int32_t)newSize > pTSBuf->numOfAlloc);
STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
if (tmp == NULL) { if (tmp == NULL) {
return NULL; return NULL;
} }
pTSBuf->pData = tmp; pTSBuf->pData = tmp;
pTSBuf->numOfAlloc = newSize; pTSBuf->numOfAlloc = newSize;
memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups)); memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
} }
if (pTSBuf->numOfGroups > 0) { if (pTSBuf->numOfGroups > 0) {
STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
} }
// set initial value for vnode block // set initial value for vnode block
STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
pBlockInfo->id = id; pBlockInfo->id = id;
pBlockInfo->offset = pTSBuf->fileSize; pBlockInfo->offset = pTSBuf->fileSize;
assert(pBlockInfo->offset >= getDataStartOffset()); assert(pBlockInfo->offset >= getDataStartOffset());
// update vnode info in file // update vnode info in file
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
// add one vnode info // add one vnode info
pTSBuf->numOfGroups += 1; pTSBuf->numOfGroups += 1;
// update the header info // update the header info
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastGroupInfo(pTSBuf); return tsBufGetLastGroupInfo(pTSBuf);
} }
...@@ -240,7 +241,7 @@ static void shrinkBuffer(STSList* ptsData) { ...@@ -240,7 +241,7 @@ static void shrinkBuffer(STSList* ptsData) {
// shrink tmp buffer size if it consumes too many memory compared to the pre-defined size // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
if (ptsData->allocSize >= ptsData->threshold * 2) { if (ptsData->allocSize >= ptsData->threshold * 2) {
char* rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE); char* rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE);
if(rawBuf) { if (rawBuf) {
ptsData->rawBuf = rawBuf; ptsData->rawBuf = rawBuf;
ptsData->allocSize = MEM_BUF_SIZE; ptsData->allocSize = MEM_BUF_SIZE;
} }
...@@ -260,18 +261,17 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -260,18 +261,17 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len == 0) { if (pTSBuf->tsData.len == 0) {
return; return;
} }
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
STSList* pTsData = &pTSBuf->tsData; STSList* pTsData = &pTSBuf->tsData;
pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE; pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
pBlock->compLen = pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload,
tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len/TSDB_KEYSIZE, pBlock->payload, pTsData->allocSize, pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET); int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET);
assert(r == 0); assert(r == 0);
/* /*
* format for output data: * format for output data:
* 1. tags, number of ts, size after compressed, payload, size after compressed * 1. tags, number of ts, size after compressed, payload, size after compressed
...@@ -289,10 +289,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -289,10 +289,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
float tfloat = (float)pBlock->tag.d; float tfloat = (float)pBlock->tag.d;
metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t) pBlock->tag.nLen); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t) pBlock->tag.nLen); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
} else { } else {
trueLen = 0; trueLen = 0;
metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
...@@ -303,19 +303,19 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -303,19 +303,19 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen); taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen)); taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
metaLen += (int32_t) taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
assert(metaLen == getTagAreaLength(&pBlock->tag)); assert(metaLen == getTagAreaLength(&pBlock->tag));
int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize; pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0; pTSBuf->tsData.len = 0;
STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
pGroupBlockInfoEx->info.compLen += blockSize; pGroupBlockInfoEx->info.compLen += blockSize;
pGroupBlockInfoEx->info.numOfBlocks += 1; pGroupBlockInfoEx->info.numOfBlocks += 1;
shrinkBuffer(&pTSBuf->tsData); shrinkBuffer(&pTSBuf->tsData);
} }
...@@ -326,7 +326,7 @@ static void expandBuffer(STSList* ptsData, int32_t inputSize) { ...@@ -326,7 +326,7 @@ static void expandBuffer(STSList* ptsData, int32_t inputSize) {
if (tmp == NULL) { if (tmp == NULL) {
// todo // todo
} }
ptsData->rawBuf = tmp; ptsData->rawBuf = tmp;
ptsData->allocSize = newSize; ptsData->allocSize = newSize;
} }
...@@ -336,8 +336,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -336,8 +336,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
// clear the memory buffer // clear the memory buffer
pBlock->compLen = 0; pBlock->compLen = 0;
pBlock->padding = 0; pBlock->padding = 0;
pBlock->numOfElem = 0; pBlock->numOfElem = 0;
int32_t offset = -1; int32_t offset = -1;
...@@ -347,11 +347,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -347,11 +347,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
* set the right position for the reversed traverse, the reversed traverse is started from * set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block * the end of each comp data block
*/ */
int32_t prev = -(int32_t) (sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen)); int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR); int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR);
size_t sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); size_t sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
UNUSED(sz); UNUSED(sz);
pBlock->compLen = pBlock->padding; pBlock->compLen = pBlock->padding;
...@@ -376,11 +376,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -376,11 +376,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
UNUSED(sz); UNUSED(sz);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
float tfloat = 0; float tfloat = 0;
sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t) pBlock->tag.nLen); sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
pBlock->tag.d = (double)tfloat; pBlock->tag.d = (double)tfloat;
UNUSED(sz); UNUSED(sz);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { // TODO check the return value
sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t) pBlock->tag.nLen); sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
UNUSED(sz); UNUSED(sz);
} }
...@@ -395,7 +395,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -395,7 +395,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
} }
// read the comp length at the length of comp block // read the comp length at the length of comp block
sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
assert(pBlock->padding == pBlock->compLen); assert(pBlock->padding == pBlock->compLen);
...@@ -409,24 +409,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -409,24 +409,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
} }
UNUSED(sz); UNUSED(sz);
// for backwards traverse, set the start position at the end of previous block // for backwards traverse, set the start position at the end of previous block
if (order == TSDB_ORDER_DESC) { if (order == TSDB_ORDER_DESC) {
int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR); int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
UNUSED(r); UNUSED(r);
} }
return pBlock; return pBlock;
} }
// set the order of ts buffer if the ts order has not been set yet // set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
STSList* ptsData = &pTSBuf->tsData; STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->tsOrder == -1) { if (pTSBuf->tsOrder == -1) {
if (ptsData->len > 0) { if (ptsData->len > 0) {
TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
if (lastKey > *(TSKEY*)pData) { if (lastKey > *(TSKEY*)pData) {
pTSBuf->tsOrder = TSDB_ORDER_DESC; pTSBuf->tsOrder = TSDB_ORDER_DESC;
} else { } else {
...@@ -436,7 +436,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { ...@@ -436,7 +436,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
// no data in current vnode, more than one ts is added, check the orders // no data in current vnode, more than one ts is added, check the orders
TSKEY k1 = *(TSKEY*)(pData); TSKEY k1 = *(TSKEY*)(pData);
TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
if (k1 < k2) { if (k1 < k2) {
pTSBuf->tsOrder = TSDB_ORDER_ASC; pTSBuf->tsOrder = TSDB_ORDER_ASC;
} else if (k1 > k2) { } else if (k1 > k2) {
...@@ -448,23 +448,23 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { ...@@ -448,23 +448,23 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
} else { } else {
// todo the timestamp order is set, check the asc/desc order of appended data // todo the timestamp order is set, check the asc/desc order of appended data
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) { void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) {
STSGroupBlockInfoEx* pBlockInfo = NULL; STSGroupBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData; STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) { if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData); shrinkBuffer(ptsData);
pBlockInfo = addOneGroupInfo(pTSBuf, id); pBlockInfo = addOneGroupInfo(pTSBuf, id);
} else { } else {
pBlockInfo = tsBufGetLastGroupInfo(pTSBuf); pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
} }
assert(pBlockInfo->info.id == id); assert(pBlockInfo->info.id == id);
if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
...@@ -476,22 +476,22 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, i ...@@ -476,22 +476,22 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, i
taosVariantAssign(&pTSBuf->block.tag, tag); taosVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value // todo check return value
setCheckTSOrder(pTSBuf, pData, len); setCheckTSOrder(pTSBuf, pData, len);
ptsData->len += len; ptsData->len += len;
pBlockInfo->len += len; pBlockInfo->len += len;
pTSBuf->numOfTotal += len / TSDB_KEYSIZE; pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
// the size of raw data exceeds the size of the default prepared buffer, so // the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough. // during getBufBlock, the output buffer needs to be large enough.
if (ptsData->len >= ptsData->threshold) { if (ptsData->len >= ptsData->threshold) {
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData); shrinkBuffer(ptsData);
} }
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
} }
...@@ -499,15 +499,15 @@ void tsBufFlush(STSBuf* pTSBuf) { ...@@ -499,15 +499,15 @@ void tsBufFlush(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len <= 0) { if (pTSBuf->tsData.len <= 0) {
return; return;
} }
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
shrinkBuffer(&pTSBuf->tsData); shrinkBuffer(&pTSBuf->tsData);
STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
// save the ts order into header // save the ts order into header
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
...@@ -522,7 +522,7 @@ static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t num ...@@ -522,7 +522,7 @@ static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t num
break; break;
} }
} }
return j; return j;
} }
...@@ -531,17 +531,17 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int ...@@ -531,17 +531,17 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int
if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
return -1; return -1;
} }
// sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
int32_t i = 0; int32_t i = 0;
bool decomp = false; bool decomp = false;
while ((i++) <= blockIndex) { while ((i++) <= blockIndex) {
if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) { if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) {
return -1; return -1;
} }
} }
// set the file position to be the end of previous comp block // set the file position to be the end of previous comp block
if (pTSBuf->cur.order == TSDB_ORDER_DESC) { if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
...@@ -550,34 +550,34 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int ...@@ -550,34 +550,34 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int
int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR); int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR);
UNUSED(ret); UNUSED(ret);
} }
return 0; return 0;
} }
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) { static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) {
bool decomp = false; bool decomp = false;
int64_t offset = 0; int64_t offset = 0;
if (pTSBuf->cur.order == TSDB_ORDER_ASC) { if (pTSBuf->cur.order == TSDB_ORDER_ASC) {
offset = pBlockInfo->offset; offset = pBlockInfo->offset;
} else { // reversed traverse starts from the end of block } else { // reversed traverse starts from the end of block
offset = pBlockInfo->offset + pBlockInfo->compLen; offset = pBlockInfo->offset + pBlockInfo->compLen;
} }
if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) { if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
return -1; return -1;
} }
for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
return -1; return -1;
} }
if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) { if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) {
return (pTSBuf->cur.order == TSDB_ORDER_ASC)? i: (pBlockInfo->numOfBlocks - (i + 1)); return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1));
} }
} }
return -1; return -1;
} }
...@@ -586,14 +586,14 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex ...@@ -586,14 +586,14 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
if (pBlockInfo->numOfBlocks <= blockIndex) { if (pBlockInfo->numOfBlocks <= blockIndex) {
assert(false); assert(false);
} }
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
(pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
int32_t i = 0; int32_t i = 0;
bool decomp = false; bool decomp = false;
int32_t step = abs(blockIndex - pCur->blockIndex); int32_t step = abs(blockIndex - pCur->blockIndex);
while ((++i) <= step) { while ((++i) <= step) {
if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
return; return;
...@@ -604,11 +604,11 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex ...@@ -604,11 +604,11 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
assert(false); assert(false);
} }
} }
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
size_t s = pBlock->numOfElem * TSDB_KEYSIZE; size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
/* /*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
...@@ -616,16 +616,16 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex ...@@ -616,16 +616,16 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
if (s > pTSBuf->tsData.allocSize) { if (s > pTSBuf->tsData.allocSize) {
expandBuffer(&pTSBuf->tsData, (int32_t)s); expandBuffer(&pTSBuf->tsData, (int32_t)s);
} }
pTSBuf->tsData.len = pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
pCur->vgroupIndex = groupIndex; pCur->vgroupIndex = groupIndex;
pCur->blockIndex = blockIndex; pCur->blockIndex = blockIndex;
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
} }
...@@ -647,7 +647,7 @@ STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) { ...@@ -647,7 +647,7 @@ STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
if (j == -1) { if (j == -1) {
return NULL; return NULL;
} }
return &pTSBuf->pData[j].info; return &pTSBuf->pData[j].info;
} }
...@@ -660,13 +660,14 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { ...@@ -660,13 +660,14 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
if (r != 0) { if (r != 0) {
// qError("fseek failed, errno:%d", errno); // qError("fseek failed, errno:%d", errno);
return -1; return -1;
} }
size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader)); size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader));
if (ws != 1) { if (ws != 1) {
// qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, (int32_t)sizeof(STSBufFileHeader)); // qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
// (int32_t)sizeof(STSBufFileHeader));
return -1; return -1;
} }
return 0; return 0;
...@@ -676,33 +677,33 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -676,33 +677,33 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) { if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
return false; return false;
} }
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
// get the first/last position according to traverse order // get the first/last position according to traverse order
if (pCur->vgroupIndex == -1) { if (pCur->vgroupIndex == -1) {
if (pCur->order == TSDB_ORDER_ASC) { if (pCur->order == TSDB_ORDER_ASC) {
tsBufGetBlock(pTSBuf, 0, 0); tsBufGetBlock(pTSBuf, 0, 0);
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
return false; return false;
} else { } else {
return true; return true;
} }
} else { // get the last timestamp record in the last block of the last vnode } else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfGroups > 0); assert(pTSBuf->numOfGroups > 0);
int32_t groupIndex = pTSBuf->numOfGroups - 1; int32_t groupIndex = pTSBuf->numOfGroups - 1;
pCur->vgroupIndex = groupIndex; pCur->vgroupIndex = groupIndex;
int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1; int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, groupIndex, blockIndex); tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
pCur->tsIndex = pTSBuf->block.numOfElem - 1; pCur->tsIndex = pTSBuf->block.numOfElem - 1;
if (pTSBuf->block.numOfElem == 0) { if (pTSBuf->block.numOfElem == 0) {
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
...@@ -712,16 +713,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -712,16 +713,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
} }
} }
} }
int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1; int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1;
while (1) { while (1) {
assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
(pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
(pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
...@@ -730,15 +731,15 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -730,15 +731,15 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
pCur->vgroupIndex = -1; pCur->vgroupIndex = -1;
return false; return false;
} }
if (pBlockInfo == NULL) { if (pBlockInfo == NULL) {
return false; return false;
} }
int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1); int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
break; break;
} else { } else {
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step); tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
break; break;
...@@ -748,7 +749,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -748,7 +749,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
break; break;
} }
} }
return true; return true;
} }
...@@ -756,7 +757,7 @@ void tsBufResetPos(STSBuf* pTSBuf) { ...@@ -756,7 +757,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return; return;
} }
pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order}; pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
} }
...@@ -765,14 +766,14 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { ...@@ -765,14 +766,14 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return elem1; return elem1;
} }
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
if (pCur != NULL && pCur->vgroupIndex < 0) { if (pCur != NULL && pCur->vgroupIndex < 0) {
return elem1; return elem1;
} }
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id; elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = &pBlock->tag; elem1.tag = &pBlock->tag;
...@@ -791,65 +792,65 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { ...@@ -791,65 +792,65 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
return 0; return 0;
} }
if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) { if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
return -1; return -1;
} }
// src can only have one vnode index // src can only have one vnode index
assert(pSrcBuf->numOfGroups == 1); assert(pSrcBuf->numOfGroups == 1);
// there are data in buffer, flush to disk first // there are data in buffer, flush to disk first
tsBufFlush(pDestBuf); tsBufFlush(pDestBuf);
// compared with the last vnode id // compared with the last vnode id
int32_t id = tsBufGetLastGroupInfo((STSBuf*) pSrcBuf)->info.id; int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id;
if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) { if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
int32_t oldSize = pDestBuf->numOfGroups; int32_t oldSize = pDestBuf->numOfGroups;
int32_t newSize = oldSize + pSrcBuf->numOfGroups; int32_t newSize = oldSize + pSrcBuf->numOfGroups;
if (pDestBuf->numOfAlloc < newSize) { if (pDestBuf->numOfAlloc < newSize) {
pDestBuf->numOfAlloc = newSize; pDestBuf->numOfAlloc = newSize;
STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
if (tmp == NULL) { if (tmp == NULL) {
return -1; return -1;
} }
pDestBuf->pData = tmp; pDestBuf->pData = tmp;
} }
// directly copy the vnode index information // directly copy the vnode index information
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx)); memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
// set the new offset value // set the new offset value
for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) { for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
pBlockInfoEx->info.id = id; pBlockInfoEx->info.id = id;
} }
pDestBuf->numOfGroups = newSize; pDestBuf->numOfGroups = newSize;
} else { } else {
STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf); STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
pBlockInfoEx->info.id = id; pBlockInfoEx->info.id = id;
} }
int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END); int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END);
assert(r == 0); assert(r == 0);
int64_t offset = getDataStartOffset(); int64_t offset = getDataStartOffset();
int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset; int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size); int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
if (written == -1 || written != size) { if (written == -1 || written != size) {
return -1; return -1;
} }
pDestBuf->numOfTotal += pSrcBuf->numOfTotal; pDestBuf->numOfTotal += pSrcBuf->numOfTotal;
int32_t oldSize = pDestBuf->fileSize; int32_t oldSize = pDestBuf->fileSize;
...@@ -864,7 +865,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { ...@@ -864,7 +865,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t file_size; int64_t file_size;
if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) { if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) {
return -1; return -1;
} }
pDestBuf->fileSize = (uint32_t)file_size; pDestBuf->fileSize = (uint32_t)file_size;
...@@ -875,33 +876,33 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { ...@@ -875,33 +876,33 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) { STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
STSBuf* pTSBuf = tsBufCreate(true, order); STSBuf* pTSBuf = tsBufCreate(true, order);
STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info); STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->numOfBlocks = numOfBlocks;
pBlockInfo->compLen = len; pBlockInfo->compLen = len;
pBlockInfo->offset = getDataStartOffset(); pBlockInfo->offset = getDataStartOffset();
pBlockInfo->id = id; pBlockInfo->id = id;
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET); int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
// qError("fseek failed, errno:%d", errno); // qError("fseek failed, errno:%d", errno);
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len); size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len);
if (sz != len) { if (sz != len) {
// qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len); // qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->fileSize += len; pTSBuf->fileSize += len;
pTSBuf->tsOrder = order; pTSBuf->tsOrder = order;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
if (STSBufUpdateHeader(pTSBuf, &header) < 0) { if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
...@@ -910,42 +911,42 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -910,42 +911,42 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
} }
// TODO taosFsync?? // TODO taosFsync??
// if (taosFsync(fileno(pTSBuf->pFile)) == -1) { // if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
//// qError("fsync failed, errno:%d", errno); //// qError("fsync failed, errno:%d", errno);
// tsBufDestroy(pTSBuf); // tsBufDestroy(pTSBuf);
// return NULL; // return NULL;
// } // }
return pTSBuf; return pTSBuf;
} }
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) { STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) {
STSElem elem = {.id = -1}; STSElem elem = {.id = -1};
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return elem; return elem;
} }
int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id); int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
if (j == -1) { if (j == -1) {
return elem; return elem;
} }
// for debug purpose // for debug purpose
// tsBufDisplay(pTSBuf); // tsBufDisplay(pTSBuf);
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
if (blockIndex < 0) { if (blockIndex < 0) {
return elem; return elem;
} }
pCur->vgroupIndex = j; pCur->vgroupIndex = j;
pCur->blockIndex = blockIndex; pCur->blockIndex = blockIndex;
tsBufGetBlock(pTSBuf, j, blockIndex); tsBufGetBlock(pTSBuf, j, blockIndex);
return tsBufGetElem(pTSBuf); return tsBufGetElem(pTSBuf);
} }
...@@ -954,7 +955,7 @@ STSCursor tsBufGetCursor(STSBuf* pTSBuf) { ...@@ -954,7 +955,7 @@ STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return c; return c;
} }
return pTSBuf->cur; return pTSBuf->cur;
} }
...@@ -962,12 +963,12 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { ...@@ -962,12 +963,12 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
if (pTSBuf == NULL || pCur == NULL) { if (pTSBuf == NULL || pCur == NULL) {
return; return;
} }
// assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); // assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
if (pCur->vgroupIndex != -1) { if (pCur->vgroupIndex != -1) {
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex); tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
} }
pTSBuf->cur = *pCur; pTSBuf->cur = *pCur;
} }
...@@ -975,7 +976,7 @@ void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { ...@@ -975,7 +976,7 @@ void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return; return;
} }
pTSBuf->cur.order = order; pTSBuf->cur.order = order;
} }
...@@ -992,19 +993,19 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) { ...@@ -992,19 +993,19 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) {
void tsBufDisplay(STSBuf* pTSBuf) { void tsBufDisplay(STSBuf* pTSBuf) {
printf("-------start of ts comp file-------\n"); printf("-------start of ts comp file-------\n");
printf("number of vnode:%d\n", pTSBuf->numOfGroups); printf("number of vnode:%d\n", pTSBuf->numOfGroups);
int32_t old = pTSBuf->cur.order; int32_t old = pTSBuf->cur.order;
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
while (tsBufNextPos(pTSBuf)) { while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) { if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts); printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts);
} }
} }
pTSBuf->cur.order = old; pTSBuf->cur.order = old;
printf("-------end of ts comp file-------\n"); printf("-------end of ts comp file-------\n");
} }
...@@ -1021,36 +1022,36 @@ static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInf ...@@ -1021,36 +1022,36 @@ static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInf
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
const int32_t INITIAL_GROUPINFO_SIZE = 4; const int32_t INITIAL_GROUPINFO_SIZE = 4;
pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE; pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx)); pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
if (pTSBuf->pData == NULL) { if (pTSBuf->pData == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->tsData.rawBuf == NULL) { if (pTSBuf->tsData.rawBuf == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->bufSize = MEM_BUF_SIZE; pTSBuf->bufSize = MEM_BUF_SIZE;
pTSBuf->tsData.threshold = MEM_BUF_SIZE; pTSBuf->tsData.threshold = MEM_BUF_SIZE;
pTSBuf->tsData.allocSize = MEM_BUF_SIZE; pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->assistBuf == NULL) { if (pTSBuf->assistBuf == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->block.payload = malloc(MEM_BUF_SIZE); pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
if (pTSBuf->block.payload == NULL) { if (pTSBuf->block.payload == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->fileSize += getDataStartOffset(); pTSBuf->fileSize += getDataStartOffset();
return pTSBuf; return pTSBuf;
} }
...@@ -1076,28 +1077,28 @@ void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) { ...@@ -1076,28 +1077,28 @@ void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
(*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t)); (*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
(*id)[i] = pTSBuf->pData[i].info.id; (*id)[i] = pTSBuf->pData[i].info.id;
} }
} }
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) { int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups); assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
STSGroupBlockInfo *pBlockInfo = &pTSBuf->pData[groupIndex].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
*len = 0; *len = 0;
*numOfBlocks = 0; *numOfBlocks = 0;
if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
int code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile));
// qError("%p: fseek failed: %s", pSql, tstrerror(code)); // qError("%p: fseek failed: %s", pSql, tstrerror(code));
return code; return code;
} }
size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen); size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
if (s != pBlockInfo->compLen) { if (s != pBlockInfo->compLen) {
int code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile)); int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); // tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return code; return code;
} }
...@@ -1120,6 +1121,4 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) { ...@@ -1120,6 +1121,4 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
return el; return el;
} }
bool tsBufIsValidElem(STSElem* pElem) { bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }
return pElem->id >= 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册