未验证 提交 003a97e7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1913 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -3620,299 +3620,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3620,299 +3620,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int8_t lastVal = TSDB_DATA_TINYINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((char *)&data[i], type)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (type != TSDB_DATA_TYPE_BOOL) { // ignore the bool data type pre-calculation
// if (isNull((char *)&lastVal, type)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
// }
}
}
static void getStatics_i16(int64_t *primaryKey, int16_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int16_t lastVal = TSDB_DATA_SMALLINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_SMALLINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_SMALLINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i32(int64_t *primaryKey, int32_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int32_t lastVal = TSDB_DATA_INT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_INT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i64(int64_t *primaryKey, int64_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_BIGINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_f(int64_t *primaryKey, float *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
float fmin = DBL_MAX;
float fmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_FLOAT)) {
(*numOfNull) += 1;
continue;
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
*minIndex = i;
}
if (fmax < fv) {
fmax = fv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_FLOAT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*sum = csum;
*max = fmax;
*min = fmin;
#endif
}
static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
continue;
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
*minIndex = i;
}
if (dmax < dv) {
dmax = dv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_DOUBLE)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*sum = csum;
*max = dmax;
*min = dmin;
#endif
}
void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
int64_t *primaryKey = (int64_t *)priData;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(data + i * size, type)) {
(*numOfNull) += 1;
continue;
}
}
} else {
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
getStatics_i8(primaryKey, type, (int8_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
getStatics_i16(primaryKey, (int16_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_INT) {
getStatics_i32(primaryKey, (int32_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) {
getStatics_i64(primaryKey, (int64_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
getStatics_d(primaryKey, (double *)data, numOfRow, (double*) min, (double*) max, (double*) sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_FLOAT) {
getStatics_f(primaryKey, (float *)data, numOfRow, (double*) min, (double*) max, (double*) sum, minIndex, maxIndex, numOfNull);
}
}
}
/** /**
* param[1]: start time * param[1]: start time
......
...@@ -32,18 +32,280 @@ const int32_t TYPE_BYTES[11] = { ...@@ -32,18 +32,280 @@ const int32_t TYPE_BYTES[11] = {
sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR
}; };
static void getStatics_i8(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int8_t *data = (int8_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int8_t lastVal = TSDB_DATA_TINYINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((char *)&data[i], TSDB_DATA_TYPE_TINYINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
}
}
static void getStatics_i16(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int16_t *data = (int16_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int16_t lastVal = TSDB_DATA_SMALLINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_SMALLINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_SMALLINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i32(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int32_t *data = (int32_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int32_t lastVal = TSDB_DATA_INT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_INT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i64(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int64_t *data = (int64_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_BIGINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
float *data = (float *)pData;
float fmin = DBL_MAX;
float fmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_FLOAT)) {
(*numOfNull) += 1;
continue;
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
*minIndex = i;
}
if (fmax < fv) {
fmax = fv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_FLOAT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*sum = csum;
*max = fmax;
*min = fmin;
#endif
}
static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
double *data = (double *)pData;
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
continue;
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
*minIndex = i;
}
if (dmax < dv) {
dmax = dv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_DOUBLE)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*sum = csum;
*max = dmax;
*min = dmin;
#endif
}
tDataTypeDescriptor tDataTypeDesc[11] = { tDataTypeDescriptor tDataTypeDesc[11] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL}, {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool}, {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, NULL},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint}, {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint}, {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt}, {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt, getStatics_i32},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint}, {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat}, {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble}, {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString}, {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, NULL},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp}, {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, NULL},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString}, {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, NULL},
}; };
char tTokenTypeSwitcher[13] = { char tTokenTypeSwitcher[13] = {
......
...@@ -147,6 +147,8 @@ typedef struct tDataTypeDescriptor { ...@@ -147,6 +147,8 @@ typedef struct tDataTypeDescriptor {
char algorithm, char *const buffer, int bufferSize); char algorithm, char *const buffer, int bufferSize);
int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output, int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize); int outputSize, char algorithm, char *const buffer, int bufferSize);
void (*getStatisFunc)(const TSKEY *primaryKey, const void *pData, int32_t numofrow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor; } tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypeDesc[11]; extern tDataTypeDescriptor tDataTypeDesc[11];
......
...@@ -72,7 +72,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside ...@@ -72,7 +72,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(TsdbRepoT *repo); int32_t tsdbDropRepo(TsdbRepoT *repo);
TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo); int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
......
...@@ -325,6 +325,13 @@ typedef struct { ...@@ -325,6 +325,13 @@ typedef struct {
int16_t len; // Column length // TODO: int16_t is not enough int16_t len; // Column length // TODO: int16_t is not enough
int32_t type : 8; int32_t type : 8;
int32_t offset : 24; int32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
char padding[2];
} SCompCol; } SCompCol;
// TODO: Take recover into account // TODO: Take recover into account
......
...@@ -258,7 +258,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -258,7 +258,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
* *
* @return 0 for success, -1 for failure and the error number is set * @return 0 for success, -1 for failure and the error number is set
*/ */
int32_t tsdbCloseRepo(TsdbRepoT *repo) { int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0; if (pRepo == NULL) return 0;
int id = pRepo->config.tsdbId; int id = pRepo->config.tsdbId;
...@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
tsdbUnLockRepo(repo); tsdbUnLockRepo(repo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbCommitData((void *)repo); if (toCommit) tsdbCommitData((void *)repo);
tsdbCloseFileH(pRepo->tsdbFileH); tsdbCloseFileH(pRepo->tsdbFileH);
...@@ -840,7 +840,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -840,7 +840,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
tsdbTrace("vgId:%d, tid:%d, uid:" PRId64 ", a row is inserted to table! key:" PRId64, tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", a row is inserted to table! key:%" PRId64,
pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, dataRowKey(row)); pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, dataRowKey(row));
return 0; return 0;
...@@ -1018,10 +1018,16 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1018,10 +1018,16 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Create and open files for commit // Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir); tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err; if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
goto _err;
}
// Open files for write/read // Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err; if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
goto _err;
}
// Loop to commit data in each table // Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) { for (int tid = 1; tid < pCfg->maxTables; tid++) {
...@@ -1058,13 +1064,22 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1058,13 +1064,22 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
ASSERT(pDataCols->numOfPoints == 0); ASSERT(pDataCols->numOfPoints == 0);
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
goto _err;
}
// Write the SCompBlock part // Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) goto _err; if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
goto _err;
}
} }
if (tsdbWriteCompIdx(pHelper) < 0) goto _err; if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
goto _err;
}
tsdbCloseHelperFile(pHelper, 0); tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods // TODO: make it atomic with some methods
......
...@@ -356,11 +356,11 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { ...@@ -356,11 +356,11 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
// Register to meta // Register to meta
if (newSuper) { if (newSuper) {
tsdbAddTableToMeta(pMeta, super, true); tsdbAddTableToMeta(pMeta, super, true);
tsdbTrace("vgId:%d, super table is created! uid:" PRId64, pRepo->config.tsdbId, tsdbTrace("vgId:%d, super table is created! uid:%" PRId64, pRepo->config.tsdbId,
super->tableId.uid); super->tableId.uid);
} }
tsdbAddTableToMeta(pMeta, table, true); tsdbAddTableToMeta(pMeta, table, true);
tsdbTrace("vgId:%d, table is created! tid:%d, uid:" PRId64, pRepo->config.tsdbId, table->tableId.tid, tsdbTrace("vgId:%d, table is created! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, table->tableId.tid,
table->tableId.uid); table->tableId.uid);
// Write to meta file // Write to meta file
...@@ -409,7 +409,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) { ...@@ -409,7 +409,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
return -1; return -1;
} }
tsdbTrace("vgId:%d, table is dropped! tid:%d, uid:" PRId64, pRepo->config.tsdbId, tableId.tid, tableId.uid); tsdbTrace("vgId:%d, table is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tableId.tid, tableId.uid);
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1; if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
return 0; return 0;
......
...@@ -703,6 +703,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -703,6 +703,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompCol->colId = pDataCol->colId; pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type; pCompCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
}
nColsNotAllNull++; nColsNotAllNull++;
} }
......
...@@ -380,7 +380,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -380,7 +380,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
cqClose(pVnode->cq); cqClose(pVnode->cq);
pVnode->cq = NULL; pVnode->cq = NULL;
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
walClose(pVnode->wal); walClose(pVnode->wal);
...@@ -431,8 +431,8 @@ static void vnodeNotifyFileSynced(void *ahandle) { ...@@ -431,8 +431,8 @@ static void vnodeNotifyFileSynced(void *ahandle) {
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// close tsdb, then open tsdb // clsoe tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb, 0);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册