提交 c5e33ad6 编写于 作者: sangshuduo's avatar sangshuduo

merge with master branch.

......@@ -265,12 +265,12 @@ pipeline {
}
}
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
'''
// sh '''
// cd ${WKC}/tests/pytest
// rm -rf /var/lib/taos/*
// rm -rf /var/log/taos/*
// ./handle_crash_gen_val_log.sh
// '''
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
......
......@@ -1778,6 +1778,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
}
_error:
pParentSql->res.code = code;
tfree(tokenBuf);
tfree(line);
taos_free_result(pSql);
......
......@@ -86,6 +86,10 @@ typedef struct STscStmt {
return _code; \
} while (0)
#define STMT_CHECK if (pStmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) { \
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); \
}
static int32_t invalidOperationMsg(char* dstBuffer, const char* errMsg) {
return tscInvalidOperationMsg(dstBuffer, errMsg, NULL);
}
......@@ -155,6 +159,22 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
var->i64 = *(int64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UTINYINT:
var->u64 = *(uint8_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_USMALLINT:
var->u64 = *(uint16_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UINT:
var->u64 = *(uint32_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_UBIGINT:
var->u64 = *(uint64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_FLOAT:
var->dKey = GET_FLOAT_VAL(tb->buffer);
break;
......@@ -261,9 +281,17 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
taosStringBuilderAppendInteger(&sb, var->i64);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
taosStringBuilderAppendUnsignedInteger(&sb, var->u64);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
taosStringBuilderAppendDouble(&sb, var->dKey);
......@@ -1501,9 +1529,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (sql == NULL) {
tscError("sql is NULL");
......@@ -1580,9 +1606,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
SSqlObj* pSql = pStmt->pSql;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1742,6 +1766,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = true;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1750,6 +1775,7 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
pStmt->mtb.subSet = false;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
......@@ -1757,6 +1783,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
if (!pStmt->isInsert) {
SNormalStmt* normal = &pStmt->normal;
if (normal->params != NULL) {
......@@ -1785,16 +1812,14 @@ int taos_stmt_close(TAOS_STMT* stmt) {
}
}
taos_free_result(pStmt->pSql);
tscFreeSqlObj(pStmt->pSql);
tfree(pStmt);
STMT_RET(TSDB_CODE_SUCCESS);
}
int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->multiTbInsert) {
......@@ -1823,9 +1848,7 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1856,9 +1879,7 @@ int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX || colIdx < 0) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
......@@ -1891,9 +1912,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
int taos_stmt_add_batch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_BIND && pStmt->last != STMT_BIND_COL) {
......@@ -1920,9 +1939,7 @@ int taos_stmt_reset(TAOS_STMT* stmt) {
int taos_stmt_execute(TAOS_STMT* stmt) {
int ret = 0;
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
if (pStmt->last != STMT_ADD_BATCH) {
......@@ -1968,18 +1985,14 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
tscError("result has been used already.");
return NULL;
}
TAOS_RES* result = pStmt->pSql;
pStmt->pSql = NULL;
return result;
}
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (insert) *insert = pStmt->isInsert;
......@@ -1989,9 +2002,7 @@ int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlObj* pSql = pStmt->pSql;
......@@ -2008,9 +2019,7 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
STMT_CHECK
if (pStmt->isInsert) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
......
......@@ -2609,6 +2609,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS_DUMMY].name, sizeof(pExpr->base.aliasName));
SColumnList ids = createColumnList(1, 0, 0);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
......@@ -8449,6 +8450,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
pSub->udfCopy = true;
pSub->pDownstream = pQueryInfo;
taosArrayPush(pQueryInfo->pUpstream, &pSub);
int32_t code = validateSqlNode(pSql, p, pSub);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -8472,8 +8474,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1);
}
taosArrayPush(pQueryInfo->pUpstream, &pSub);
// NOTE: order mix up in subquery not support yet.
pQueryInfo->order = pSub->order;
......
......@@ -2978,7 +2978,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && /*tscIsProjectionQueryOnSTable(pQueryInfo, 0) &&*/ !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
......
......@@ -60,6 +60,21 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%d", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%d", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRId64, *(uint64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
......
......@@ -84,7 +84,7 @@ int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
int32_t tsMaxNumOfOrderedResults = 100000;
int32_t tsMaxNumOfOrderedResults = 1000000;
// 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10;
......@@ -1017,8 +1017,8 @@ static void doInitGlobalConfig(void) {
cfg.ptr = &tsMaxNumOfOrderedResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MAX_SQL_LEN;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.minValue = 100000;
cfg.maxValue = 100000000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
......@@ -1217,10 +1217,10 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "topicBianryLen";
cfg.option = "topicBinaryLen";
cfg.ptr = &tsTopicBianryLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 16;
cfg.maxValue = 16000;
cfg.ptrLength = 0;
......
此差异已折叠。
......@@ -225,16 +225,15 @@ static struct argp_option options[] = {
{"password", 'p', 0, 0, "User password to connect to server. Default is taosdata.", 0},
#endif
{"port", 'P', "PORT", 0, "Port to connect", 0},
{"cversion", 'v', "CVERION", 0, "client version", 0},
{"mysqlFlag", 'q', "MYSQLFLAG", 0, "mysqlFlag, Default is 0", 0},
// input/output file
{"outpath", 'o', "OUTPATH", 0, "Output file path.", 1},
{"inpath", 'i', "INPATH", 0, "Input file path.", 1},
{"resultFile", 'r', "RESULTFILE", 0, "DumpOut/In Result file path and name.", 1},
#ifdef _TD_POWER_
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/power/taos.cfg.", 1},
{"config-dir", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/power/taos.cfg.", 1},
#else
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1},
{"config-dir", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1},
#endif
{"encode", 'e', "ENCODE", 0, "Input file encoding.", 1},
// dump unit options
......@@ -244,7 +243,7 @@ static struct argp_option options[] = {
// dump format options
{"schemaonly", 's', 0, 0, "Only dump schema.", 2},
{"without-property", 'N', 0, 0, "Dump schema without properties.", 2},
{"avro", 'V', 0, 0, "Dump apache avro format data file. By default, dump sql command sequence.", 2},
{"avro", 'v', 0, 0, "Dump apache avro format data file. By default, dump sql command sequence.", 2},
{"start-time", 'S', "START_TIME", 0, "Start time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00:000+0800 or '2017-10-01 00:00:00.000+0800'", 4},
{"end-time", 'E', "END_TIME", 0, "End time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 5},
#if TSDB_SUPPORT_NANOSECOND == 1
......@@ -267,7 +266,6 @@ typedef struct arguments {
char *user;
char password[SHELL_MAX_PASSWORD_LEN];
uint16_t port;
char cversion[12];
uint16_t mysqlFlag;
// output file
char outpath[MAX_FILE_NAME_LEN];
......@@ -338,7 +336,6 @@ struct arguments g_args = {
"taosdata",
#endif
0,
"",
0,
// outpath and inpath
"",
......@@ -379,6 +376,15 @@ static void errorPrintReqArg2(char *program, char *wrong_arg)
"Try `taosdump --help' or `taosdump --usage' for more information.\n");
}
static void errorPrintReqArg3(char *program, char *wrong_arg)
{
fprintf(stderr,
"%s: option '%s' requires an argument\n",
program, wrong_arg);
fprintf(stderr,
"Try `taosdump --help' or `taosdump --usage' for more information.\n");
}
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
......@@ -408,15 +414,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'q':
g_args.mysqlFlag = atoi(arg);
break;
case 'v':
if (wordexp(arg, &full_path, 0) != 0) {
errorPrint("Invalid client vesion %s\n", arg);
return -1;
}
tstrncpy(g_args.cversion, full_path.we_wordv[0], 11);
wordfree(&full_path);
break;
// output file path
case 'o':
if (wordexp(arg, &full_path, 0) != 0) {
errorPrint("Invalid path %s\n", arg);
......@@ -443,9 +440,13 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
g_args.resultFile = arg;
break;
case 'c':
if (0 == strlen(arg)) {
errorPrintReqArg3("taosdump", "-c or --config-dir");
exit(EXIT_FAILURE);
}
if (wordexp(arg, &full_path, 0) != 0) {
errorPrint("Invalid path %s\n", arg);
return -1;
exit(EXIT_FAILURE);
}
tstrncpy(configDir, full_path.we_wordv[0], MAX_FILE_NAME_LEN);
wordfree(&full_path);
......@@ -466,7 +467,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'N':
g_args.with_property = false;
break;
case 'V':
case 'v':
g_args.avro = true;
break;
case 'S':
......@@ -673,6 +674,9 @@ static void parse_timestamp(
}
int main(int argc, char *argv[]) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
int ret = 0;
/* Parse our arguments; every option seen by parse_opt will be
......@@ -699,7 +703,6 @@ int main(int argc, char *argv[]) {
printf("user: %s\n", g_args.user);
printf("password: %s\n", g_args.password);
printf("port: %u\n", g_args.port);
printf("cversion: %s\n", g_args.cversion);
printf("mysqlFlag: %d\n", g_args.mysqlFlag);
printf("outpath: %s\n", g_args.outpath);
printf("inpath: %s\n", g_args.inpath);
......@@ -728,11 +731,6 @@ int main(int argc, char *argv[]) {
}
}
printf("==============================\n");
if (g_args.cversion[0] != 0){
tstrncpy(version, g_args.cversion, 11);
}
if (taosCheckParam(&g_args) < 0) {
exit(EXIT_FAILURE);
}
......@@ -750,7 +748,6 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "user: %s\n", g_args.user);
fprintf(g_fpOfResult, "password: %s\n", g_args.password);
fprintf(g_fpOfResult, "port: %u\n", g_args.port);
fprintf(g_fpOfResult, "cversion: %s\n", g_args.cversion);
fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag);
fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath);
fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath);
......
......@@ -4032,11 +4032,23 @@ static void irate_function(SQLFunctionCtx *pCtx) {
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
if ((INT64_MIN == pRateInfo->lastKey) || primaryKey[i] > pRateInfo->lastKey) {
if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
continue;
}
if (primaryKey[i] > pRateInfo->lastKey) {
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
pRateInfo->firstValue = pRateInfo->lastValue;
pRateInfo->firstKey = pRateInfo->lastKey;
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
continue;
}
if ((INT64_MIN == pRateInfo->firstKey) || primaryKey[i] > pRateInfo->firstKey) {
pRateInfo->firstValue = v;
......
......@@ -67,10 +67,18 @@ static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, d
if (IS_SIGNED_NUMERIC_TYPE(type)) {
range->i64MinVal = (int64_t) minval;
range->i64MaxVal = (int64_t) maxval;
if (maxval > INT64_MAX || (int64_t)maxval == INT64_MIN) {
range->i64MaxVal = INT64_MAX;
} else {
range->i64MaxVal = (int64_t) maxval;
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)){
range->u64MinVal = (uint64_t) minval;
range->u64MaxVal = (uint64_t) maxval;
if ((uint64_t)maxval > UINT64_MAX) {
range->u64MaxVal = UINT64_MAX;
} else {
range->u64MaxVal = (uint64_t) maxval;
}
} else {
range->dMinVal = minval;
range->dMaxVal = maxval;
......@@ -127,8 +135,8 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
index = (delta % pBucket->numOfSlots);
} else {
double slotSpan = (double)span / pBucket->numOfSlots;
index = (int32_t)((v - pBucket->range.i64MinVal) / slotSpan);
if (v == pBucket->range.i64MaxVal) {
index = (int32_t)(((double)v - pBucket->range.i64MinVal) / slotSpan);
if (index == pBucket->numOfSlots) {
index -= 1;
}
}
......
......@@ -375,6 +375,16 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
if (decomp) {
if (pBlock->numOfElem * TSDB_KEYSIZE > pTSBuf->tsData.allocSize) {
pTSBuf->tsData.rawBuf = realloc(pTSBuf->tsData.rawBuf, pBlock->numOfElem * TSDB_KEYSIZE);
pTSBuf->tsData.allocSize = pBlock->numOfElem * TSDB_KEYSIZE;
}
if (pBlock->numOfElem * TSDB_KEYSIZE > pTSBuf->bufSize) {
pTSBuf->assistBuf = realloc(pTSBuf->assistBuf, pBlock->numOfElem * TSDB_KEYSIZE);
pTSBuf->bufSize = pBlock->numOfElem * TSDB_KEYSIZE;
}
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
......@@ -471,7 +481,7 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, i
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if (ptsData->len >= ptsData->threshold) {
if (ptsData->len >= ptsData->threshold - TSDB_KEYSIZE) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
}
......@@ -603,6 +613,10 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
expandBuffer(&pTSBuf->tsData, (int32_t)s);
}
if (s > pTSBuf->bufSize) {
pTSBuf->assistBuf = realloc(pTSBuf->assistBuf, s);
pTSBuf->bufSize = (int32_t)s;
}
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
......
......@@ -43,6 +43,7 @@ void taosStringBuilderAppendStringLen(SStringBuilder* sb, const char* str, size_
void taosStringBuilderAppendString(SStringBuilder* sb, const char* str);
void taosStringBuilderAppendNull(SStringBuilder* sb);
void taosStringBuilderAppendInteger(SStringBuilder* sb, int64_t v);
void taosStringBuilderAppendUnsignedInteger(SStringBuilder* sb, uint64_t v);
void taosStringBuilderAppendDouble(SStringBuilder* sb, double v);
#ifdef __cplusplus
......
......@@ -73,6 +73,12 @@ void taosStringBuilderAppendInteger(SStringBuilder* sb, int64_t v) {
taosStringBuilderAppendStringLen(sb, buf, MIN(len, sizeof(buf)));
}
void taosStringBuilderAppendUnsignedInteger(SStringBuilder* sb, uint64_t v) {
char buf[64];
size_t len = snprintf(buf, sizeof(buf), "%" PRId64, v);
taosStringBuilderAppendStringLen(sb, buf, MIN(len, sizeof(buf)));
}
void taosStringBuilderAppendDouble(SStringBuilder* sb, double v) {
char buf[512];
size_t len = snprintf(buf, sizeof(buf), "%.9lf", v);
......
此差异已折叠。
......@@ -20,9 +20,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <taos.h>
......@@ -33,14 +33,14 @@ int tablesSelectProcessed = 0;
int64_t st, et;
typedef struct {
int id;
TAOS *taos;
char name[16];
time_t timeStamp;
int value;
int rowsInserted;
int rowsTried;
int rowsRetrieved;
int id;
TAOS * taos;
char name[16];
time_t timeStamp;
int value;
int rowsInserted;
int rowsTried;
int rowsRetrieved;
} STable;
void taos_insert_call_back(void *param, TAOS_RES *tres, int code);
......@@ -48,7 +48,7 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code);
void taos_error(TAOS *taos);
static void queryDB(TAOS *taos, char *command) {
int i;
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
......@@ -57,12 +57,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
}
if (code != 0) {
......@@ -76,15 +76,14 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
}
int main(int argc, char *argv[])
{
TAOS *taos;
struct timeval systemTime;
int i;
char sql[1024] = { 0 };
char prefix[20] = { 0 };
char db[128] = { 0 };
STable *tableList;
int main(int argc, char *argv[]) {
TAOS * taos;
struct timeval systemTime;
int i;
char sql[1024] = {0};
char prefix[20] = {0};
char db[128] = {0};
STable * tableList;
if (argc != 5) {
printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]);
......@@ -101,8 +100,7 @@ int main(int argc, char *argv[])
memset(tableList, 0, size);
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
if (taos == NULL) taos_error(taos);
printf("success to connect to server\n");
......@@ -122,7 +120,7 @@ int main(int argc, char *argv[])
sprintf(tableList[i].name, "%s%d", prefix, i);
sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i);
queryDB(taos, sql);
}
}
gettimeofday(&systemTime, NULL);
for (i = 0; i < numOfTables; ++i)
......@@ -138,7 +136,7 @@ int main(int argc, char *argv[])
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) {
for (i = 0; i < numOfTables; ++i) {
// insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i));
......@@ -147,12 +145,12 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n");
getchar();
while(1) {
while (1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
......@@ -161,9 +159,8 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API
// select records in asynchronous API
sprintf(sql, "select * from %s", tableList[i].name);
taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i));
}
......@@ -171,17 +168,17 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n");
getchar();
while(1) {
while (1) {
if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
for (i = 0; i<numOfTables; ++i) {
for (i = 0; i < numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
......@@ -193,60 +190,54 @@ int main(int argc, char *argv[])
return 0;
}
void taos_error(TAOS *con)
{
void taos_error(TAOS *con) {
fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
taos_close(con);
taos_cleanup();
exit(1);
}
void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
{
STable *pTable = (STable *)param;
struct timeval systemTime;
char sql[128];
void taos_insert_call_back(void *param, TAOS_RES *tres, int code) {
STable * pTable = (STable *)param;
struct timeval systemTime;
char sql[128];
pTable->rowsTried++;
if (code < 0) {
if (code < 0) {
printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried);
}
else if (code == 0) {
} else if (code == 0) {
printf("%s not inserted\n", pTable->name);
}
else {
} else {
pTable->rowsInserted++;
}
if (pTable->rowsTried < points) {
// for this demo, insert another record
sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000+pTable->rowsTried*1000, pTable->rowsTried);
sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000 + pTable->rowsTried * 1000,
pTable->rowsTried);
taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable);
}
else {
} else {
printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesInsertProcessed++;
if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points * numOfTables);
}
}
taos_free_result(tres);
}
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
{
STable *pTable = (STable *)param;
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) {
STable * pTable = (STable *)param;
struct timeval systemTime;
if (numOfRows > 0) {
for (int i = 0; i<numOfRows; ++i) {
for (int i = 0; i < numOfRows; ++i) {
// synchronous API to retrieve a row from batch of records
/*TAOS_ROW row = */(void)taos_fetch_row(tres);
/*TAOS_ROW row = */ (void)taos_fetch_row(tres);
// process row
}
......@@ -255,12 +246,10 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
// retrieve next batch of rows
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
}
else {
if (numOfRows < 0)
printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
} else {
if (numOfRows < 0) printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
//taos_free_result(tres);
// taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesSelectProcessed++;
......@@ -272,19 +261,15 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
taos_free_result(tres);
}
}
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
{
void taos_select_call_back(void *param, TAOS_RES *tres, int code) {
STable *pTable = (STable *)param;
if (code == 0 && tres) {
// asynchronous API to fetch a batch of records
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
}
else {
} else {
printf("%s select failed, code:%d\n", pTable->name, code);
taos_free_result(tres);
taos_cleanup();
......
......@@ -16,14 +16,14 @@
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <taos.h> // TAOS header file
static void queryDB(TAOS *taos, char *command) {
int i;
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
......@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
}
if (code != 0) {
......@@ -53,7 +53,7 @@ static void queryDB(TAOS *taos, char *command) {
void Test(TAOS *taos, char *qstr, int i);
int main(int argc, char *argv[]) {
char qstr[1024];
char qstr[1024];
// connect to server
if (argc < 2) {
......@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) {
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
for (int i = 0; i < 100; i++) {
......@@ -72,28 +72,30 @@ int main(int argc, char *argv[]) {
taos_close(taos);
taos_cleanup();
}
void Test(TAOS *taos, char *qstr, int index) {
void Test(TAOS *taos, char *qstr, int index) {
printf("==================test at %d\n================================", index);
queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo");
TAOS_RES *result;
queryDB(taos, "use demo");
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
queryDB(taos,
"create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
printf("success to create table\n");
int i = 0;
for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello");
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')",
(uint64_t)(1546300800000 + i * 1000), i, i, i, i * 10000000, i * 1.0, i * 2.0, "hello");
printf("qstr: %s\n", qstr);
// note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
} else {
......@@ -107,7 +109,7 @@ void Test(TAOS *taos, char *qstr, int index) {
sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result));
printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result);
exit(1);
}
......@@ -130,4 +132,3 @@ void Test(TAOS *taos, char *qstr, int index) {
taos_free_result(result);
printf("====demo end====\n\n");
}
......@@ -21,103 +21,101 @@
#ifdef __APPLE__
#include "osEok.h"
#else // __APPLE__
#else // __APPLE__
#include <sys/epoll.h>
#endif // __APPLE__
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#endif // __APPLE__
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <libgen.h>
#include <locale.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#define A(statement, fmt, ...) do { \
if (statement) break; \
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
#statement, errno, strerror(errno), \
##__VA_ARGS__); \
abort(); \
} while (0)
#define A(statement, fmt, ...) \
do { \
if (statement) break; \
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", basename(__FILE__), __LINE__, __func__, \
#statement, errno, strerror(errno), ##__VA_ARGS__); \
abort(); \
} while (0)
#define E(fmt, ...) do { \
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
errno, strerror(errno), \
##__VA_ARGS__); \
} while (0)
#define E(fmt, ...) \
do { \
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", basename(__FILE__), __LINE__, __func__, errno, strerror(errno), \
##__VA_ARGS__); \
} while (0)
#include "os.h"
typedef struct ep_s ep_t;
typedef struct ep_s ep_t;
struct ep_s {
int ep;
int ep;
pthread_mutex_t lock;
int sv[2]; // 0 for read, 1 for write;
pthread_t thread;
pthread_mutex_t lock;
int sv[2]; // 0 for read, 1 for write;
pthread_t thread;
volatile unsigned int stopping:1;
volatile unsigned int waiting:1;
volatile unsigned int wakenup:1;
volatile unsigned int stopping : 1;
volatile unsigned int waiting : 1;
volatile unsigned int wakenup : 1;
};
static int ep_dummy = 0;
static ep_t* ep_create(void);
static ep_t *ep_create(void);
static void ep_destroy(ep_t *ep);
static void* routine(void* arg);
static int open_listen(unsigned short port);
static void *routine(void *arg);
static int open_listen(unsigned short port);
typedef struct fde_s fde_t;
typedef struct fde_s fde_t;
struct fde_s {
int skt;
int skt;
void (*on_event)(ep_t *ep, struct epoll_event *events, fde_t *client);
};
static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
#define usage(arg0, fmt, ...) do { \
if (fmt[0]) { \
fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \
} \
fprintf(stderr, "usage:\n"); \
fprintf(stderr, " %s -l <port> : specify listenning port\n", arg0); \
} while (0)
#define usage(arg0, fmt, ...) \
do { \
if (fmt[0]) { \
fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \
} \
fprintf(stderr, "usage:\n"); \
fprintf(stderr, " %s -l <port> : specify listenning port\n", arg0); \
} while (0)
int main(int argc, char *argv[]) {
char *prg = basename(argv[0]);
if (argc==1) {
if (argc == 1) {
usage(prg, "");
return 0;
}
ep_t* ep = ep_create();
ep_t *ep = ep_create();
A(ep, "failed");
for (int i=1; i<argc; ++i) {
for (int i = 1; i < argc; ++i) {
const char *arg = argv[i];
if (0==strcmp(arg, "-l")) {
if (0 == strcmp(arg, "-l")) {
++i;
if (i>=argc) {
if (i >= argc) {
usage(prg, "expecting <port> after -l, but got nothing");
return 1; // confirmed potential leakage
return 1; // confirmed potential leakage
}
arg = argv[i];
int port = atoi(arg);
int skt = open_listen(port);
if (skt==-1) continue;
fde_t *client = (fde_t*)calloc(1, sizeof(*client));
if (skt == -1) continue;
fde_t *client = (fde_t *)calloc(1, sizeof(*client));
if (!client) {
E("out of memory");
close(skt);
......@@ -126,32 +124,32 @@ int main(int argc, char *argv[]) {
client->skt = skt;
client->on_event = listen_event;
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = client;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
continue;
}
usage(prg, "unknown argument: [%s]", arg);
return 1;
}
char *line = NULL;
size_t linecap = 0;
char * line = NULL;
size_t linecap = 0;
ssize_t linelen;
while ((linelen = getline(&line, &linecap, stdin)) > 0) {
line[strlen(line)-1] = '\0';
if (0==strcmp(line, "exit")) break;
if (0==strcmp(line, "quit")) break;
if (line==strstr(line, "close")) {
line[strlen(line) - 1] = '\0';
if (0 == strcmp(line, "exit")) break;
if (0 == strcmp(line, "quit")) break;
if (line == strstr(line, "close")) {
int fd = 0;
sscanf(line, "close %d", &fd);
if (fd<=2) {
if (fd <= 2) {
fprintf(stderr, "fd [%d] invalid\n", fd);
continue;
}
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), "");
A(0 == epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), "");
continue;
}
if (strlen(line)==0) continue;
if (strlen(line) == 0) continue;
fprintf(stderr, "unknown cmd:[%s]\n", line);
}
ep_destroy(ep);
......@@ -159,69 +157,69 @@ int main(int argc, char *argv[]) {
return 0;
}
ep_t* ep_create(void) {
ep_t *ep = (ep_t*)calloc(1, sizeof(*ep));
ep_t *ep_create(void) {
ep_t *ep = (ep_t *)calloc(1, sizeof(*ep));
A(ep, "out of memory");
A(-1!=(ep->ep = epoll_create(1)), "");
A(-1 != (ep->ep = epoll_create(1)), "");
ep->sv[0] = -1;
ep->sv[1] = -1;
A(0==socketpair(AF_LOCAL, SOCK_STREAM, 0, ep->sv), "");
A(0==pthread_mutex_init(&ep->lock, NULL), "");
A(0==pthread_mutex_lock(&ep->lock), "");
A(0 == socketpair(AF_LOCAL, SOCK_STREAM, 0, ep->sv), "");
A(0 == pthread_mutex_init(&ep->lock, NULL), "");
A(0 == pthread_mutex_lock(&ep->lock), "");
struct epoll_event ev = {0};
ev.events = EPOLLIN;
ev.events = EPOLLIN;
ev.data.ptr = &ep_dummy;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, ep->sv[0], &ev), "");
A(0==pthread_create(&ep->thread, NULL, routine, ep), "");
A(0==pthread_mutex_unlock(&ep->lock), "");
A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, ep->sv[0], &ev), "");
A(0 == pthread_create(&ep->thread, NULL, routine, ep), "");
A(0 == pthread_mutex_unlock(&ep->lock), "");
return ep;
}
static void ep_destroy(ep_t *ep) {
A(ep, "invalid argument");
ep->stopping = 1;
A(1==send(ep->sv[1], "1", 1, 0), "");
A(0==pthread_join(ep->thread, NULL), "");
A(0==pthread_mutex_destroy(&ep->lock), "");
A(0==close(ep->sv[0]), "");
A(0==close(ep->sv[1]), "");
A(0==close(ep->ep), "");
A(1 == send(ep->sv[1], "1", 1, 0), "");
A(0 == pthread_join(ep->thread, NULL), "");
A(0 == pthread_mutex_destroy(&ep->lock), "");
A(0 == close(ep->sv[0]), "");
A(0 == close(ep->sv[1]), "");
A(0 == close(ep->ep), "");
free(ep);
}
static void* routine(void* arg) {
static void *routine(void *arg) {
A(arg, "invalid argument");
ep_t *ep = (ep_t*)arg;
ep_t *ep = (ep_t *)arg;
while (!ep->stopping) {
struct epoll_event evs[10];
memset(evs, 0, sizeof(evs));
A(0==pthread_mutex_lock(&ep->lock), "");
A(ep->waiting==0, "internal logic error");
A(0 == pthread_mutex_lock(&ep->lock), "");
A(ep->waiting == 0, "internal logic error");
ep->waiting = 1;
A(0==pthread_mutex_unlock(&ep->lock), "");
A(0 == pthread_mutex_unlock(&ep->lock), "");
int r = epoll_wait(ep->ep, evs, sizeof(evs)/sizeof(evs[0]), -1);
A(r>0, "indefinite epoll_wait shall not timeout:[%d]", r);
int r = epoll_wait(ep->ep, evs, sizeof(evs) / sizeof(evs[0]), -1);
A(r > 0, "indefinite epoll_wait shall not timeout:[%d]", r);
A(0==pthread_mutex_lock(&ep->lock), "");
A(ep->waiting==1, "internal logic error");
A(0 == pthread_mutex_lock(&ep->lock), "");
A(ep->waiting == 1, "internal logic error");
ep->waiting = 0;
A(0==pthread_mutex_unlock(&ep->lock), "");
A(0 == pthread_mutex_unlock(&ep->lock), "");
for (int i=0; i<r; ++i) {
for (int i = 0; i < r; ++i) {
struct epoll_event *ev = evs + i;
if (ev->data.ptr == &ep_dummy) {
char c = '\0';
A(1==recv(ep->sv[0], &c, 1, 0), "internal logic error");
A(0==pthread_mutex_lock(&ep->lock), "");
A(1 == recv(ep->sv[0], &c, 1, 0), "internal logic error");
A(0 == pthread_mutex_lock(&ep->lock), "");
ep->wakenup = 0;
A(0==pthread_mutex_unlock(&ep->lock), "");
A(0 == pthread_mutex_unlock(&ep->lock), "");
continue;
}
A(ev->data.ptr, "internal logic error");
fde_t *client = (fde_t*)ev->data.ptr;
fde_t *client = (fde_t *)ev->data.ptr;
client->on_event(ep, ev, client);
continue;
}
......@@ -232,7 +230,7 @@ static void* routine(void* arg) {
static int open_listen(unsigned short port) {
int r = 0;
int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (skt==-1) {
if (skt == -1) {
E("socket() failed");
return -1;
}
......@@ -241,7 +239,7 @@ static int open_listen(unsigned short port) {
si.sin_family = AF_INET;
si.sin_addr.s_addr = inet_addr("0.0.0.0");
si.sin_port = htons(port);
r = bind(skt, (struct sockaddr*)&si, sizeof(si));
r = bind(skt, (struct sockaddr *)&si, sizeof(si));
if (r) {
E("bind(%u) failed", port);
break;
......@@ -257,7 +255,7 @@ static int open_listen(unsigned short port) {
if (r) {
E("getsockname() failed");
}
A(len==sizeof(si), "internal logic error");
A(len == sizeof(si), "internal logic error");
D("listenning at: %d", ntohs(si.sin_port));
return skt;
} while (0);
......@@ -268,10 +266,10 @@ static int open_listen(unsigned short port) {
static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
A(ev->events & EPOLLIN, "internal logic error");
struct sockaddr_in si = {0};
socklen_t silen = sizeof(si);
int skt = accept(client->skt, (struct sockaddr*)&si, &silen);
A(skt!=-1, "internal logic error");
fde_t *server = (fde_t*)calloc(1, sizeof(*server));
socklen_t silen = sizeof(si);
int skt = accept(client->skt, (struct sockaddr *)&si, &silen);
A(skt != -1, "internal logic error");
fde_t *server = (fde_t *)calloc(1, sizeof(*server));
if (!server) {
close(skt);
return;
......@@ -279,26 +277,25 @@ static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
server->skt = skt;
server->on_event = null_event;
struct epoll_event ee = {0};
ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ee.data.ptr = server;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), "");
A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), "");
}
static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
if (ev->events & EPOLLIN) {
char buf[8192];
int n = recv(client->skt, buf, sizeof(buf), 0);
A(n>=0 && n<=sizeof(buf), "internal logic error:[%d]", n);
A(n==fwrite(buf, 1, n, stdout), "internal logic error");
int n = recv(client->skt, buf, sizeof(buf), 0);
A(n >= 0 && n <= sizeof(buf), "internal logic error:[%d]", n);
A(n == fwrite(buf, 1, n, stdout), "internal logic error");
}
if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
A(0==pthread_mutex_lock(&ep->lock), "");
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), "");
A(0==pthread_mutex_unlock(&ep->lock), "");
A(0 == pthread_mutex_lock(&ep->lock), "");
A(0 == epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), "");
A(0 == pthread_mutex_unlock(&ep->lock), "");
close(client->skt);
client->skt = -1;
client->on_event = NULL;
free(client);
}
}
......@@ -7,6 +7,7 @@ LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
-Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX \
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99
-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment
all: $(TARGET)
......@@ -19,10 +20,9 @@ exe:
gcc $(CFLAGS) ./apitest.c -o $(ROOT)apitest $(LFLAGS)
clean:
rm $(ROOT)asyncdemo
rm $(ROOT)demo
rm $(ROOT)prepare
rm $(ROOT)batchprepare
rm $(ROOT)stream
rm $(ROOT)subscribe
rm $(ROOT)apitest
rm -f $(ROOT)asyncdemo
rm -f $(ROOT)demo
rm -f $(ROOT)prepare
rm -f $(ROOT)stream
rm -f $(ROOT)subscribe
rm -f $(ROOT)apitest
此差异已折叠。
#include "os.h"
#include "taos.h"
#include "taoserror.h"
#include "os.h"
#include <stdio.h>
#include <stdlib.h>
......@@ -12,15 +12,12 @@ int numSuperTables = 8;
int numChildTables = 4;
int numRowsPerChildTable = 2048;
void shuffle(char**lines, size_t n)
{
if (n > 1)
{
void shuffle(char** lines, size_t n) {
if (n > 1) {
size_t i;
for (i = 0; i < n - 1; i++)
{
for (i = 0; i < n - 1; i++) {
size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
}
......@@ -34,7 +31,7 @@ static int64_t getTimeInUs() {
}
int main(int argc, char* argv[]) {
TAOS_RES *result;
TAOS_RES* result;
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
......@@ -59,12 +56,16 @@ int main(int argc, char* argv[]) {
(void)taos_select_db(taos, "db");
time_t ct = time(0);
time_t ct = time(0);
int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char* lineFormat =
"sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11="
"\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
......@@ -78,121 +79,142 @@ int main(int argc, char* argv[]) {
shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs();
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin);
printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin);
char* lines_000_0[] = {
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us"
};
code = taos_insert_lines(taos, lines_000_0 , sizeof(lines_000_0)/sizeof(char*));
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7="
"2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639000us"};
code = taos_insert_lines(taos, lines_000_0, sizeof(lines_000_0) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_0 should return error\n");
return -1;
}
char* lines_000_1[] = {
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
code = taos_insert_lines(taos, lines_000_1 , sizeof(lines_000_1)/sizeof(char*));
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,"
"t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639001"};
code = taos_insert_lines(taos, lines_000_1, sizeof(lines_000_1) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_1 should return error\n");
return -1;
}
char* lines_000_2[] = {
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_000_2 , sizeof(lines_000_2)/sizeof(char*));
code = taos_insert_lines(taos, lines_000_2, sizeof(lines_000_2) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_0[] = {
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
};
code = taos_insert_lines(taos, lines_001_0 , sizeof(lines_001_0)/sizeof(char*));
code = taos_insert_lines(taos, lines_001_0, sizeof(lines_001_0) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_1[] = {
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"};
code = taos_insert_lines(taos, lines_001_1 , sizeof(lines_001_1)/sizeof(char*));
code = taos_insert_lines(taos, lines_001_1, sizeof(lines_001_1) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_2[] = {
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_001_2 , sizeof(lines_001_2)/sizeof(char*));
code = taos_insert_lines(taos, lines_001_2, sizeof(lines_001_2) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_002[] = {
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006834s"
};
code = taos_insert_lines(taos, lines_002 , sizeof(lines_002)/sizeof(char*));
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006834s"};
code = taos_insert_lines(taos, lines_002, sizeof(lines_002) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
//Duplicate key check;
char* lines_003_1[] = {
"std,id=\"std_3_1\",t1=4i64,Id=\"std\",t2=true c1=true 1626006834s"
};
// Duplicate key check;
char* lines_003_1[] = {"std,id=\"std_3_1\",t1=4i64,Id=\"std\",t2=true c1=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_1 , sizeof(lines_003_1)/sizeof(char*));
code = taos_insert_lines(taos, lines_003_1, sizeof(lines_003_1) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_2[] = {
"std,id=\"std_3_2\",tag1=4i64,Tag2=true,tAg3=2,TaG2=\"dup!\" c1=true 1626006834s"
};
char* lines_003_2[] = {"std,id=\"std_3_2\",tag1=4i64,Tag2=true,tAg3=2,TaG2=\"dup!\" c1=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_2 , sizeof(lines_003_2)/sizeof(char*));
code = taos_insert_lines(taos, lines_003_2, sizeof(lines_003_2) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_3[] = {
"std,id=\"std_3_3\",tag1=4i64 field1=true,Field2=2,FIElD1=\"dup!\",fIeLd4=true 1626006834s"
};
char* lines_003_3[] = {"std,id=\"std_3_3\",tag1=4i64 field1=true,Field2=2,FIElD1=\"dup!\",fIeLd4=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_3 , sizeof(lines_003_3)/sizeof(char*));
code = taos_insert_lines(taos, lines_003_3, sizeof(lines_003_3) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_3 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_4[] = {
"std,id=\"std_3_4\",tag1=4i64,dupkey=4i16,tag2=T field1=true,dUpkEy=1e3f32,field2=\"1234\" 1626006834s"
};
"std,id=\"std_3_4\",tag1=4i64,dupkey=4i16,tag2=T field1=true,dUpkEy=1e3f32,field2=\"1234\" 1626006834s"};
code = taos_insert_lines(taos, lines_003_4 , sizeof(lines_003_4)/sizeof(char*));
code = taos_insert_lines(taos, lines_003_4, sizeof(lines_003_4) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_4 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
......
......@@ -13,24 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <taos.h> // include TDengine header file
#include <unistd.h>
typedef struct {
char server_ip[64];
char db_name[64];
char tbl_name[64];
char server_ip[64];
char db_name[64];
char tbl_name[64];
} param;
int g_thread_exit_flag = 0;
void* insert_rows(void *sarg);
int g_thread_exit_flag = 0;
void *insert_rows(void *sarg);
void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row)
{
void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row) {
// in this simple demo, it just print out the result
char temp[128];
......@@ -42,85 +41,81 @@ void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row)
printf("\n%s\n", temp);
}
int main(int argc, char *argv[])
{
TAOS *taos;
char db_name[64];
char tbl_name[64];
char sql[1024] = { 0 };
int main(int argc, char *argv[]) {
TAOS *taos;
char db_name[64];
char tbl_name[64];
char sql[1024] = {0};
if (argc != 4) {
printf("usage: %s server-ip dbname tblname\n", argv[0]);
exit(0);
}
}
strcpy(db_name, argv[2]);
strcpy(tbl_name, argv[3]);
// create pthread to insert into row per second for stream calc
param *t_param = (param *)malloc(sizeof(param));
if (NULL == t_param)
{
if (NULL == t_param) {
printf("failed to malloc\n");
exit(1);
}
memset(t_param, 0, sizeof(param));
memset(t_param, 0, sizeof(param));
strcpy(t_param->server_ip, argv[1]);
strcpy(t_param->db_name, db_name);
strcpy(t_param->tbl_name, tbl_name);
pthread_t pid;
pthread_create(&pid, NULL, (void * (*)(void *))insert_rows, t_param);
pthread_create(&pid, NULL, (void *(*)(void *))insert_rows, t_param);
sleep(3); // waiting for database is created.
sleep(3); // waiting for database is created.
// open connection to database
taos = taos_connect(argv[1], "root", "taosdata", db_name, 0);
if (taos == NULL) {
printf("failed to connet to server:%s\n", argv[1]);
free(t_param);
free(t_param);
exit(1);
}
// starting stream calc,
// starting stream calc,
printf("please input stream SQL:[e.g., select count(*) from tblname interval(5s) sliding(2s);]\n");
fgets(sql, sizeof(sql), stdin);
if (sql[0] == 0) {
printf("input NULL stream SQL, so exit!\n");
printf("input NULL stream SQL, so exit!\n");
free(t_param);
exit(1);
}
// param is set to NULL in this demo, it shall be set to the pointer to app context
// param is set to NULL in this demo, it shall be set to the pointer to app context
TAOS_STREAM *pStream = taos_open_stream(taos, sql, streamCallBack, 0, NULL, NULL);
if (NULL == pStream) {
printf("failed to create stream\n");
printf("failed to create stream\n");
free(t_param);
exit(1);
}
printf("presss any key to exit\n");
getchar();
taos_close_stream(pStream);
g_thread_exit_flag = 1;
g_thread_exit_flag = 1;
pthread_join(pid, NULL);
taos_close(taos);
free(t_param);
free(t_param);
return 0;
}
void *insert_rows(void *sarg) {
TAOS * taos;
char command[1024] = {0};
param *winfo = (param *)sarg;
void* insert_rows(void *sarg)
{
TAOS *taos;
char command[1024] = { 0 };
param *winfo = (param * )sarg;
if (NULL == winfo){
printf("para is null!\n");
if (NULL == winfo) {
printf("para is null!\n");
exit(1);
}
......@@ -129,7 +124,7 @@ void* insert_rows(void *sarg)
printf("failed to connet to server:%s\n", winfo->server_ip);
exit(1);
}
// drop database
sprintf(command, "drop database %s;", winfo->db_name);
if (taos_query(taos, command) != 0) {
......@@ -160,19 +155,18 @@ void* insert_rows(void *sarg)
// insert data
int64_t begin = (int64_t)time(NULL);
int index = 0;
int index = 0;
while (1) {
if (g_thread_exit_flag) break;
index++;
sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index);
if (taos_query(taos, command)) {
printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos));
}
sleep(1);
}
}
taos_close(taos);
return 0;
}
......@@ -14,10 +14,10 @@ void print_result(TAOS_RES* res, int blockFetch) {
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
//for (int i = 0; i < nRows; i++) {
// for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields);
// puts(buf);
//}
......@@ -34,15 +34,11 @@ void print_result(TAOS_RES* res, int blockFetch) {
printf("%d rows consumed.\n", nRows);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res, *(int*)param);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) { print_result(res, *(int*)param); }
void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0;
TAOS_ROW row;
int actual = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
actual++;
}
......@@ -53,16 +49,14 @@ void check_row_count(int line, TAOS_RES* res, int expected) {
}
}
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql);
taos_free_result(res);
}
void run_test(TAOS* taos) {
do_query(taos, "drop database if exists test;");
usleep(100000);
do_query(taos, "create database test;");
usleep(100000);
......@@ -161,14 +155,13 @@ void run_test(TAOS* taos) {
taos_unsubscribe(tsub, 0);
}
int main(int argc, char *argv[]) {
int main(int argc, char* argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
const char* sql = "select * from meters;";
const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) {
......@@ -240,20 +233,21 @@ int main(int argc, char *argv[]) {
if (tsub == NULL) {
printf("failed to create subscription.\n");
exit(0);
}
}
if (async) {
getchar();
} else while(1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
} else
while (1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
}
}
printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep);
......
......@@ -125,7 +125,7 @@ class TDDnode:
"charset":"UTF-8",
"asyncLog":"0",
"anyIp":"0",
"tsEnableTelemetryReporting":"0",
"telemetryReporting":"0",
"dDebugFlag":"135",
"tsdbDebugFlag":"135",
"mDebugFlag":"135",
......
......@@ -7,7 +7,7 @@ LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
-Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX \
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99 \
-fsanitize=address
-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment
all: $(TARGET)
......@@ -15,10 +15,10 @@ exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS)
gcc $(CFLAGS) ./stmtTest.c -o $(ROOT)stmtTest $(LFLAGS)
gcc $(CFLAGS) ./stmt_function.c -o $(ROOT)stmt_function $(LFLAGS)
gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
clean:
rm $(ROOT)batchprepare
rm $(ROOT)stmtBatchTest
rm $(ROOT)stmtTest
rm $(ROOT)stmt_function
rm $(ROOT)stmt
此差异已折叠。
此差异已折叠。
......@@ -941,6 +941,17 @@ if $data32 != 0.000144445 then
return -1
endi
sql insert into t1 values('2015-09-18 00:30:00', 3.0);
sql select irate(k) from t1
if $rows != 1 then
return -1
endi
if $data00 != 0.000000354 then
return -1
endi
print ===========================> derivative
sql drop table t1
sql drop table tx;
......
......@@ -497,7 +497,7 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "jdbc" ] && [ "$2" !=
totalExamplePass=`expr $totalExamplePass + 1`
fi
./prepare 127.0.0.1 > /dev/null 2>&1
./prepare > /dev/null 2>&1
if [ $? != "0" ]; then
echo "prepare failed"
totalExampleFailed=`expr $totalExampleFailed + 1`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册