提交 f640afa3 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'develop' into hotfix/savedVersion

......@@ -3620,299 +3620,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int8_t lastVal = TSDB_DATA_TINYINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((char *)&data[i], type)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (type != TSDB_DATA_TYPE_BOOL) { // ignore the bool data type pre-calculation
// if (isNull((char *)&lastVal, type)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
// }
}
}
static void getStatics_i16(int64_t *primaryKey, int16_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int16_t lastVal = TSDB_DATA_SMALLINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_SMALLINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_SMALLINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i32(int64_t *primaryKey, int32_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int32_t lastVal = TSDB_DATA_INT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_INT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i64(int64_t *primaryKey, int64_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_BIGINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_f(int64_t *primaryKey, float *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
float fmin = DBL_MAX;
float fmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_FLOAT)) {
(*numOfNull) += 1;
continue;
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
*minIndex = i;
}
if (fmax < fv) {
fmax = fv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_FLOAT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*sum = csum;
*max = fmax;
*min = fmin;
#endif
}
static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
assert(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
continue;
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
*minIndex = i;
}
if (dmax < dv) {
dmax = dv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_DOUBLE)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*sum = csum;
*max = dmax;
*min = dmin;
#endif
}
void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
int64_t *primaryKey = (int64_t *)priData;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(data + i * size, type)) {
(*numOfNull) += 1;
continue;
}
}
} else {
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
getStatics_i8(primaryKey, type, (int8_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
getStatics_i16(primaryKey, (int16_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_INT) {
getStatics_i32(primaryKey, (int32_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) {
getStatics_i64(primaryKey, (int64_t *)data, numOfRow, min, max, sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
getStatics_d(primaryKey, (double *)data, numOfRow, (double*) min, (double*) max, (double*) sum, minIndex, maxIndex, numOfNull);
} else if (type == TSDB_DATA_TYPE_FLOAT) {
getStatics_f(primaryKey, (float *)data, numOfRow, (double*) min, (double*) max, (double*) sum, minIndex, maxIndex, numOfNull);
}
}
}
/**
* param[1]: start time
......
......@@ -32,18 +32,280 @@ const int32_t TYPE_BYTES[11] = {
sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR
};
static void getStatics_i8(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int8_t *data = (int8_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int8_t lastVal = TSDB_DATA_TINYINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((char *)&data[i], TSDB_DATA_TYPE_TINYINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
}
}
static void getStatics_i16(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int16_t *data = (int16_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int16_t lastVal = TSDB_DATA_SMALLINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_SMALLINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_SMALLINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i32(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int32_t *data = (int32_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
// int64_t lastKey = 0;
// int32_t lastVal = TSDB_DATA_INT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_INT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_i64(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
int64_t *data = (int64_t *)pData;
*min = INT64_MAX;
*max = INT64_MIN;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_BIGINT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
float *data = (float *)pData;
float fmin = DBL_MAX;
float fmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_FLOAT)) {
(*numOfNull) += 1;
continue;
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
*minIndex = i;
}
if (fmax < fv) {
fmax = fv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_FLOAT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*sum = csum;
*max = fmax;
*min = fmin;
#endif
}
static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
double *data = (double *)pData;
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
double dsum = 0;
*minIndex = 0;
*maxIndex = 0;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) &data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
continue;
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
*minIndex = i;
}
if (dmax < dv) {
dmax = dv;
*maxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_DOUBLE)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*sum = csum;
*max = dmax;
*min = dmin;
#endif
}
tDataTypeDescriptor tDataTypeDesc[11] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString},
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, NULL},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt, getStatics_i32},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, NULL},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, NULL},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, NULL},
};
char tTokenTypeSwitcher[13] = {
......
......@@ -147,6 +147,8 @@ typedef struct tDataTypeDescriptor {
char algorithm, char *const buffer, int bufferSize);
int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
void (*getStatisFunc)(const TSKEY *primaryKey, const void *pData, int32_t numofrow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypeDesc[11];
......
......@@ -72,7 +72,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(TsdbRepoT *repo);
TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo);
int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION
......@@ -140,7 +140,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(TsdbRepoT *pRepo, SSubmitMsg *pMsg);
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ;
// -- FOR QUERY TIME SERIES DATA
......
......@@ -1460,15 +1460,15 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId);
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
return;
}
if (pMsg->retry == 0) {
if (pMsg->pTable == NULL) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId);
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
return;
}
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
mgmtIncTableRef(pMsg->pTable);
}
......
......@@ -325,6 +325,13 @@ typedef struct {
int16_t len; // Column length // TODO: int16_t is not enough
int32_t type : 8;
int32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
char padding[2];
} SCompCol;
// TODO: Take recover into account
......
......@@ -29,7 +29,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
// static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now);
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int * affectedrows);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg);
......@@ -258,7 +258,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t tsdbCloseRepo(TsdbRepoT *repo) {
int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0;
int id = pRepo->config.tsdbId;
......@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
tsdbUnLockRepo(repo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbCommitData((void *)repo);
if (toCommit) tsdbCommitData((void *)repo);
tsdbCloseFileH(pRepo->tsdbFileH);
......@@ -406,22 +406,23 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
}
// TODO: need to return the number of data inserted
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) {
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) {
SSubmitMsgIter msgIter;
STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbInitSubmitMsgIter(pMsg, &msgIter);
SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if ((code = tsdbInsertDataToTable(repo, pBlock, now)) != TSDB_CODE_SUCCESS) {
if ((code = tsdbInsertDataToTable(repo, pBlock, now, &affectedrows)) != TSDB_CODE_SUCCESS) {
return code;
}
}
pRsp->affectedRows = htonl(affectedrows);
return code;
}
......@@ -840,13 +841,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
tsdbTrace("vgId:%d, tid:%d, uid:" PRId64 ", a row is inserted to table! key:" PRId64,
tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", a row is inserted to table! key:%" PRId64,
pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, dataRowKey(row));
return 0;
}
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now) {
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
......@@ -875,6 +876,7 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
return -1;
}
(*affectedrows)++;
}
return TSDB_CODE_SUCCESS;
......@@ -1018,10 +1020,16 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err;
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err;
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) {
......@@ -1058,13 +1066,22 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
ASSERT(pDataCols->numOfPoints == 0);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err;
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) goto _err;
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) goto _err;
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
goto _err;
}
tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods
......
......@@ -356,11 +356,11 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
// Register to meta
if (newSuper) {
tsdbAddTableToMeta(pMeta, super, true);
tsdbTrace("vgId:%d, super table is created! uid:" PRId64, pRepo->config.tsdbId,
tsdbTrace("vgId:%d, super table is created! uid:%" PRId64, pRepo->config.tsdbId,
super->tableId.uid);
}
tsdbAddTableToMeta(pMeta, table, true);
tsdbTrace("vgId:%d, table is created! tid:%d, uid:" PRId64, pRepo->config.tsdbId, table->tableId.tid,
tsdbTrace("vgId:%d, table is created! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, table->tableId.tid,
table->tableId.uid);
// Write to meta file
......@@ -409,7 +409,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
return -1;
}
tsdbTrace("vgId:%d, table is dropped! tid:%d, uid:" PRId64, pRepo->config.tsdbId, tableId.tid, tableId.uid);
tsdbTrace("vgId:%d, table is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tableId.tid, tableId.uid);
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
return 0;
......
......@@ -703,6 +703,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
}
nColsNotAllNull++;
}
......
......@@ -381,7 +381,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
cqClose(pVnode->cq);
pVnode->cq = NULL;
tsdbCloseRepo(pVnode->tsdb);
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
walClose(pVnode->wal);
......@@ -437,9 +437,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// close tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb);
// clsoe tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb, 0);
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus;
......
......@@ -91,17 +91,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
code = tsdbInsertData(pVnode->tsdb, pCont);
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp;
code = tsdbInsertData(pVnode->tsdb, pCont, pRsp);
pRsp->numOfFailedBlocks = 0; //TODO
//pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
return code;
}
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute(
"create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))")
tdSql.execute(
'CREATE TABLE if not exists dev_001 using st tags("dev_01")')
tdSql.execute(
'CREATE TABLE if not exists dev_002 using st tags("dev_02")')
print("==============step2")
tdSql.execute(
"""INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1),
('2020-05-13 10:00:00.001', 1)
dev_002 VALUES('2020-05-13 10:00:00.001', 1)""")
tdSql.query("select * from db.st where ts='2020-05-13 10:00:00.000'")
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute(
"create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))")
tdSql.execute(
'CREATE TABLE if not exists dev_001 using st tags("dev_01")')
tdSql.execute(
'CREATE TABLE if not exists dev_002 using st tags("dev_02")')
print("==============step2")
tdSql.execute(
"""INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1),
('2020-05-13 10:00:00.001', 1)
dev_002 VALUES('2020-05-13 10:00:00.001', 1)""")
tdSql.query("select * from db.st where dev='dev_01'")
tdSql.checkRows(2)
tdSql.query("select * from db.st where dev='dev_02'")
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -38,9 +38,9 @@ class TDSimClient:
tdLog.exit(cmd)
def deploy(self):
self.logDir = "%s/sim/psim/log" % (self.path,)
self.cfgDir = "%s/sim/psim/cfg" % (self.path)
self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)
self.logDir = "%s/pysim/psim/log" % (self.path,)
self.cfgDir = "%s/pysim/psim/cfg" % (self.path)
self.cfgPath = "%s/pysim/psim/cfg/taos.cfg" % (self.path)
cmd = "rm -rf " + self.logDir
if os.system(cmd) != 0:
......@@ -100,10 +100,10 @@ class TDDnode:
self.valgrind = value
def deploy(self):
self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index)
self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index)
self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index)
self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % (self.path, self.index)
self.logDir = "%s/pysim/dnode%d/log" % (self.path, self.index)
self.dataDir = "%s/pysim/dnode%d/data" % (self.path, self.index)
self.cfgDir = "%s/pysim/dnode%d/cfg" % (self.path, self.index)
self.cfgPath = "%s/pysim/dnode%d/cfg/taos.cfg" % (self.path, self.index)
cmd = "rm -rf " + self.dataDir
if os.system(cmd) != 0:
......@@ -177,21 +177,42 @@ class TDDnode:
(self.index, self.cfgPath))
def start(self):
binPath = os.path.dirname(os.path.realpath(__file__))
binPath = binPath + "/../../../debug/"
binPath = os.path.realpath(binPath)
binPath += "/build/bin/"
selfPath = os.path.dirname(os.path.realpath(__file__))
binPath = ""
if ("TDinternal" in selfPath):
projPath = selfPath + "/../../../../"
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("community" not in rootRealPath):
binPath = os.path.join(root, "taosd")
break;
else:
projPath = selfPath + "/../../../"
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
binPath = os.path.join(root, "taosd")
break;
if (binPath == ""):
tdLog.exit("taosd not found!s")
else:
tdLog.notice("taosd found in %s" % rootRealPath)
if self.deployed == 0:
tdLog.exit("dnode:%d is not deployed" % (self.index))
if self.valgrind == 0:
cmd = "nohup %staosd -c %s > /dev/null 2>&1 & " % (
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
else:
valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"
cmd = "nohup %s %staosd -c %s 2>&1 & " % (
cmd = "nohup %s %s -c %s 2>&1 & " % (
valgrindCmdline, binPath, self.cfgDir)
print(cmd)
......@@ -268,11 +289,11 @@ class TDDnode:
tdLog.exit(cmd)
def getDnodeRootDir(self, index):
dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index)
dnodeRootDir = "%s/pysim/psim/dnode%d" % (self.path, index)
return dnodeRootDir
def getDnodesRootDir(self):
dnodesRootDir = "%s/sim/psim" % (self.path)
dnodesRootDir = "%s/pysim/psim" % (self.path)
return dnodesRootDir
......@@ -417,7 +438,7 @@ class TDDnodes:
# tdLog.exit(cmd)
def getDnodesRootDir(self):
dnodesRootDir = "%s/sim" % (self.path)
dnodesRootDir = "%s/pysim" % (self.path)
return dnodesRootDir
def getSimCfgPath(self):
......
......@@ -20,6 +20,12 @@ while $i < 2000
$i = $i + 1
endw
sql show db.vgroups
if $rows != 2 then
return -1
endi
return
print ======== step2
sleep 1000
sql drop database db
......
......@@ -49,7 +49,7 @@ cd ../../../debug; make
./test.sh -f general/db/basic3.sim
./test.sh -f general/db/basic4.sim
./test.sh -f general/db/basic5.sim
./test.sh -f unique/db/delete.sim
./test.sh -f general/db/delete.sim
./test.sh -f general/db/delete_reuse1.sim
./test.sh -f general/db/delete_reuse2.sim
./test.sh -f general/db/delete_reusevnode.sim
......
......@@ -34,6 +34,11 @@ while $i < 2000
$i = $i + 1
endw
sql show db.vgroups
if $rows != 2 then
return -1
endi
print ======== step2
sleep 1000
sql drop database db
......@@ -59,14 +64,14 @@ endi
print ======== step3
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec_up.sh -n dnode1 -s stop -x SIGINT
system sh/exec_up.sh -n dnode2 -s stop -x SIGINT
system sh/exec_up.sh -n dnode3 -s stop -x SIGINT
sleep 1000
system sh/exec.sh -n dnode1 -s start -t
system sh/exec.sh -n dnode2 -s start -t
system sh/exec.sh -n dnode3 -s start -t
system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode2 -s start
system sh/exec_up.sh -n dnode3 -s start
$x = 0
step3:
......
......@@ -31,17 +31,17 @@ system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
print ========= start dnodes
system sh/exec_up.sh -n dnode1 -s start
sleep 3000
sql connect
sql create dnode $hostname2
system sh/exec_up.sh -n dnode2 -s start
sleep 3000
$loop = 0
begin:
$db = db . $loop
print ======== step1
print ======== step1 $loop
sql create database $db
sql use $db
......@@ -53,15 +53,16 @@ begin:
$x = $x + 1
endw
print ======== step2
print ======== step2 $loop
system sh/exec_up.sh -n dnode2 -s stop
sleep 1000
sql drop database $db
print ======== step3
print ======== step3 $loop
sleep 3000
system sh/exec_up.sh -n dnode2 -s start
sleep 20000
sleep 5000
print ===> test times : $loop
if $loop > 5 then
......@@ -69,6 +70,9 @@ begin:
endi
$loop = $loop + 1
sql reset query cache
sleep 1000
goto begin
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册