提交 4a69ca01 编写于 作者: Z zhihaop

Merge branch '2.6' into improve/taosc-async-enhancement-for-2.6

...@@ -34,11 +34,6 @@ CREATE DATABASE db_name PRECISION 'ns'; ...@@ -34,11 +34,6 @@ CREATE DATABASE db_name PRECISION 'ns';
| 10 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\’`。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 | | 10 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\’`。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 |
| 11 | JSON | | json 数据类型, 只有 tag 可以是 json 格式 | | 11 | JSON | | json 数据类型, 只有 tag 可以是 json 格式 |
:::tip
TDengine 对 SQL 语句中的英文字符不区分大小写,自动转化为小写执行。因此用户大小写敏感的字符串及密码,需要使用单引号将字符串引起来。
:::
:::note :::note
虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。
......
...@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy); ...@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql); int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); void doCleanupSubqueries(SSqlObj *pSql);
void tscFreeRetrieveSup(void **param); void tscFreeRetrieveSup(void **param);
......
...@@ -351,7 +351,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC ...@@ -351,7 +351,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
......
...@@ -361,6 +361,7 @@ typedef struct SSubqueryState { ...@@ -361,6 +361,7 @@ typedef struct SSubqueryState {
int8_t *states; int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
uint32_t version;
} SSubqueryState; } SSubqueryState;
typedef struct SSqlObj { typedef struct SSqlObj {
...@@ -388,7 +389,6 @@ typedef struct SSqlObj { ...@@ -388,7 +389,6 @@ typedef struct SSqlObj {
SSqlRes res; SSqlRes res;
SSubqueryState subState; SSubqueryState subState;
pthread_mutex_t mtxSubs; // avoid double access pSubs after failure
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj *rootObj; struct SSqlObj *rootObj;
...@@ -441,6 +441,12 @@ typedef struct SSqlStream { ...@@ -441,6 +441,12 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
SSqlObj* tscAllocSqlObj();
uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql);
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx, uint32_t stateVersion);
void tscReleaseRefOfSubobj(SSqlObj *pSql);
void tscResetAllSubStates(SSqlObj* pSql);
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj); int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
......
...@@ -363,8 +363,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -363,8 +363,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->fetchFp = fp; pSql->fetchFp = fp;
pSql->rootObj = pSql; pSql->rootObj = pSql;
pthread_mutex_init(&pSql->mtxSubs, NULL);
registerSqlObj(pSql); registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
...@@ -435,7 +433,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v ...@@ -435,7 +433,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
nPrintTsc("%s", sqlstr); nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
......
...@@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) { ...@@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) {
SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) { SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) {
// Init // Init
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self); tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -153,7 +153,6 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie ...@@ -153,7 +153,6 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie
// update vgroup id // update vgroup id
writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId); writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId);
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew); registerSqlObj(pNew);
tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew); tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew);
...@@ -196,24 +195,26 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -196,24 +195,26 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
if(numOfSub == 0) { if(numOfSub == 0) {
tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname); tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
ret = doInitSubState(pSql, numOfSub); ret = doReInitSubState(pSql, numOfSub);
if (ret != 0) { if (ret != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return ret; return ret;
} }
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i; int32_t i = 0;
for (i = 0; i < pState->numOfSub; ++i) {
{ pthread_mutex_lock(&pSql->subState.mutex);
for (i = 0; i < pSql->subState.numOfSub; ++i) {
// vgroup // vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
...@@ -235,23 +236,20 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -235,23 +236,20 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
} }
if (i < pState->numOfSub) { if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self); tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
} }
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { pthread_mutex_unlock(&pSql->subState.mutex); }
doCleanupSubqueries(pSql, i);
if (pRes->code != TSDB_CODE_SUCCESS) {
doCleanupSubqueries(pSql);
return pRes->code; return pRes->code;
} }
// send sub sql // send sub sql
doConcurrentlySendSubQueries(pSql); doConcurrentlySendSubQueries(pSql);
//return TSDB_CODE_TSC_QUERY_CANCELLED;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -295,7 +295,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) { ...@@ -295,7 +295,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
taos_fetch_rows_a(tres, tscSCreateCallBack, param); taos_fetch_rows_a(tres, tscSCreateCallBack, param);
builder->callStage = SCREATE_CALLBACK_RETRIEVE; builder->callStage = SCREATE_CALLBACK_RETRIEVE;
} else { } else {
char *result = calloc(1, TSDB_MAX_BINARY_LEN); char *result = calloc(1, TSDB_MAX_SQL_LEN);
pRes->code = builder->fp(builder, result); pRes->code = builder->fp(builder, result);
taos_free_result(pSql); taos_free_result(pSql);
...@@ -359,6 +359,7 @@ TAOS_ROW tscFetchRow(void *param) { ...@@ -359,6 +359,7 @@ TAOS_ROW tscFetchRow(void *param) {
tscClearSqlOwner(pSql); tscClearSqlOwner(pSql);
return data; return data;
} }
static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
TAOS_ROW row = tscFetchRow(builder); TAOS_ROW row = tscFetchRow(builder);
SSqlObj* pSql = builder->pInterSql; SSqlObj* pSql = builder->pInterSql;
...@@ -381,16 +382,16 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { ...@@ -381,16 +382,16 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
int32_t ret = tscGetNthFieldResult(row, fields, lengths, i, buf); int32_t ret = tscGetNthFieldResult(row, fields, lengths, i, buf);
if (i == 0) { if (i == 0) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s", "("); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "%s", "(");
} }
if ((fields[i].type == TSDB_DATA_TYPE_NCHAR if ((fields[i].type == TSDB_DATA_TYPE_NCHAR
|| fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_BINARY
|| fields[i].type == TSDB_DATA_TYPE_TIMESTAMP) && 0 == ret) { || fields[i].type == TSDB_DATA_TYPE_TIMESTAMP) && 0 == ret) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "\"%s\",", buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "\"%s\",", buf);
} else if (fields[i].type == TSDB_DATA_TYPE_JSON) { } else if (fields[i].type == TSDB_DATA_TYPE_JSON) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "'%s,", buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "'%s,", buf);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s,", buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "%s,", buf);
} }
free(buf); free(buf);
...@@ -487,14 +488,14 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) { ...@@ -487,14 +488,14 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) {
SCreateBuilder *builder = (SCreateBuilder *)param; SCreateBuilder *builder = (SCreateBuilder *)param;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char *buf = calloc(1,TSDB_MAX_BINARY_LEN); char *buf = calloc(1,TSDB_MAX_SQL_LEN);
if (buf == NULL) { if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
code = tscGetTableTagValue(builder, buf); code = tscGetTableTagValue(builder, buf);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE TABLE `%s` USING `%s` TAGS %s", builder->buf, builder->sTableName, buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "CREATE TABLE `%s` USING `%s` TAGS %s", builder->buf, builder->sTableName, buf);
code = tscSCreateBuildResult(builder->pParentSql, SCREATE_BUILD_TABLE, builder->buf, result); code = tscSCreateBuildResult(builder->pParentSql, SCREATE_BUILD_TABLE, builder->buf, result);
} }
free(buf); free(buf);
...@@ -537,7 +538,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { ...@@ -537,7 +538,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
int32_t* lengths = taos_fetch_lengths(pSql); int32_t* lengths = taos_fetch_lengths(pSql);
int32_t ret = tscGetNthFieldResult(row, fields, lengths, 0, buf); int32_t ret = tscGetNthFieldResult(row, fields, lengths, 0, buf);
if (0 == ret && 0 == strcmp(buf, builder->buf)) { if (0 == ret && 0 == strcmp(buf, builder->buf)) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE DATABASE `%s`", buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "CREATE DATABASE `%s`", buf);
for (int i = 1; i < num_fields; i++) { for (int i = 1; i < num_fields; i++) {
for (int j = 0; showColumns[j][0] != NULL; j++) { for (int j = 0; showColumns[j][0] != NULL; j++) {
if (STR_NOCASE_EQUAL(fields[i].name, strlen(fields[i].name), showColumns[j][0], strlen(showColumns[j][0]))) { if (STR_NOCASE_EQUAL(fields[i].name, strlen(fields[i].name), showColumns[j][0], strlen(showColumns[j][0]))) {
...@@ -545,9 +546,9 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { ...@@ -545,9 +546,9 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
ret = tscGetNthFieldResult(row, fields, lengths, i, buf); ret = tscGetNthFieldResult(row, fields, lengths, i, buf);
if (ret == 0) { if (ret == 0) {
if (STR_NOCASE_EQUAL(showColumns[j][0], strlen(showColumns[j][0]), "PRECISION", strlen("PRECISION"))) { if (STR_NOCASE_EQUAL(showColumns[j][0], strlen(showColumns[j][0]), "PRECISION", strlen("PRECISION"))) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), " %s '%s'", showColumns[j][1], buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), " %s '%s'", showColumns[j][1], buf);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), " %s %s", showColumns[j][1], buf); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), " %s %s", showColumns[j][1], buf);
} }
} }
} }
...@@ -569,7 +570,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) { ...@@ -569,7 +570,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) {
SCreateBuilder *builder = (SCreateBuilder *)param; SCreateBuilder *builder = (SCreateBuilder *)param;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char *buf = calloc(1, TSDB_MAX_BINARY_LEN); char *buf = calloc(1, TSDB_MAX_SQL_LEN);
if (buf == NULL) { if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -582,7 +583,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) { ...@@ -582,7 +583,7 @@ int32_t tscRebuildCreateDBStatement(void *param,char *result) {
} }
static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) { static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
char *buf = (char *)malloc(TSDB_MAX_BINARY_LEN); char *buf = (char *)malloc(TSDB_MAX_SQL_LEN);
if (buf == NULL) { if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -599,22 +600,23 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) { ...@@ -599,22 +600,23 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
int32_t numOfTags = tscGetNumOfTags(pMeta); int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 0; i < numOfTags; i++) { for (int32_t i = 0; i < numOfTags; i++) {
if (i != numOfTags - 1) { if (i != numOfTags - 1) {
snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "`%s`,", pTagsSchema[i].name); snprintf(buf + strlen(buf), TSDB_MAX_SQL_LEN - strlen(buf), "`%s`,", pTagsSchema[i].name);
} else { } else {
snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "`%s`", pTagsSchema[i].name); snprintf(buf + strlen(buf), TSDB_MAX_SQL_LEN - strlen(buf), "`%s`", pTagsSchema[i].name);
} }
} }
*result = buf; *result = buf;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) { static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta; STableMeta * pMeta = pTableMetaInfo->pTableMeta;
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) { if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -640,7 +642,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch ...@@ -640,7 +642,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
param->fp = tscRebuildCreateTableStatement; param->fp = tscRebuildCreateTableStatement;
param->callStage = SCREATE_CALLBACK_QUERY; param->callStage = SCREATE_CALLBACK_QUERY;
char *query = (char *)calloc(1, TSDB_MAX_BINARY_LEN); char *query = (char *)calloc(1, TSDB_MAX_SQL_LEN);
if (query == NULL) { if (query == NULL) {
free(param); free(param);
free(pInterSql); free(pInterSql);
...@@ -656,7 +658,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch ...@@ -656,7 +658,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
return code; return code;
} }
snprintf(query + strlen(query), TSDB_MAX_BINARY_LEN - strlen(query), "SELECT %s FROM %s WHERE TBNAME IN(\'%s\')", columns, fullName, tblName); snprintf(query + strlen(query), TSDB_MAX_SQL_LEN - strlen(query), "SELECT %s FROM %s WHERE TBNAME IN(\'%s\')", columns, fullName, tblName);
doAsyncQuery(pSql->pTscObj, pInterSql, tscSCreateCallBack, param, query, strlen(query)); doAsyncQuery(pSql->pTscObj, pInterSql, tscSCreateCallBack, param, query, strlen(query));
free(query); free(query);
free(columns); free(columns);
...@@ -681,9 +683,9 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, ...@@ -681,9 +683,9 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
if (type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE; bytes = bytes/TSDB_NCHAR_SIZE;
} }
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name, tDataTypes[pSchema[i].type].name, bytes); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name, tDataTypes[pSchema[i].type].name, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[pSchema[i].type].name); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[pSchema[i].type].name);
} }
} }
sprintf(result + strlen(result) - 1, "%s", ")"); sprintf(result + strlen(result) - 1, "%s", ")");
...@@ -708,12 +710,12 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, ...@@ -708,12 +710,12 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
if (type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE; bytes = bytes/TSDB_NCHAR_SIZE;
} }
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result),"`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name);
} }
} }
snprintf(result + strlen(result) - 1, TSDB_MAX_BINARY_LEN - strlen(result), "%s %s", ")", "TAGS ("); snprintf(result + strlen(result) - 1, TSDB_MAX_SQL_LEN - strlen(result), "%s %s", ")", "TAGS (");
for (int32_t i = numOfRows; i < totalRows; i++) { for (int32_t i = numOfRows; i < totalRows; i++) {
uint8_t type = pSchema[i].type; uint8_t type = pSchema[i].type;
...@@ -722,9 +724,9 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, ...@@ -722,9 +724,9 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
if (type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE; bytes = bytes/TSDB_NCHAR_SIZE;
} }
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s(%d),", pSchema[i].name,tDataTypes[pSchema[i].type].name, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name); snprintf(result + strlen(result), TSDB_MAX_SQL_LEN - strlen(result), "`%s` %s,", pSchema[i].name, tDataTypes[type].name);
} }
} }
sprintf(result + strlen(result) - 1, "%s", ")"); sprintf(result + strlen(result) - 1, "%s", ")");
...@@ -742,7 +744,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) { ...@@ -742,7 +744,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
char *result = (char *)calloc(1, TSDB_MAX_BINARY_LEN); char *result = (char *)calloc(1, TSDB_MAX_SQL_LEN);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscRebuildDDLForSuperTable(pSql, tableName, result); code = tscRebuildDDLForSuperTable(pSql, tableName, result);
...@@ -766,7 +768,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) { ...@@ -766,7 +768,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) { if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
......
...@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab ...@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab
int32_t code = 0; int32_t code = 0;
STableMeta* tableMeta = NULL; STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) { ...@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
//make a dummy SSqlObj //make a dummy SSqlObj
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) { static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
return NULL; return NULL;
} }
...@@ -2771,7 +2771,6 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod ...@@ -2771,7 +2771,6 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod
pNew->pTscObj = taos; pNew->pTscObj = taos;
pNew->fp = NULL; pNew->fp = NULL;
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew); registerSqlObj(pNew);
pNew->res.numOfRows = affected_rows; pNew->res.numOfRows = affected_rows;
......
...@@ -1587,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1587,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
} }
pStmt->taos = pObj; pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
free(pStmt); free(pStmt);
...@@ -1604,7 +1604,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1604,7 +1604,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
return NULL; return NULL;
} }
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
......
...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
// } else { // } else {
// pQdesc->stableQuery = 0; // pQdesc->stableQuery = 0;
// } // }
pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs != NULL && pSql->subState.states != NULL) { if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// because subState maybe free on anytime by any thread, check validate from here // because subState maybe free on anytime by any thread, check validate from here
...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
} }
} }
pQdesc->numOfSub = pSql->subState.numOfSub; pQdesc->numOfSub = pSql->subState.numOfSub;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
} }
pQdesc->numOfSub = htonl(pQdesc->numOfSub); pQdesc->numOfSub = htonl(pQdesc->numOfSub);
......
...@@ -10081,7 +10081,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -10081,7 +10081,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
size_t tableMetaCapacity = 0; size_t tableMetaCapacity = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
tableNameList = taosArrayInit(4, sizeof(SName)); tableNameList = taosArrayInit(4, sizeof(SName));
size_t size = taosArrayGetSize(pInfo->list); size_t size = taosArrayGetSize(pInfo->list);
......
...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
} }
} else { } else {
// lock subs { pthread_mutex_lock(&pSql->subState.mutex);
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) { if (pSql->pSubs) {
// have sub sql // have sub sql
for (int i = 0; i < pSql->subState.numOfSub; i++) { for (int i = 0; i < pSql->subState.numOfSub; i++) {
...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
} }
} }
} }
// unlock
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex); }
} }
// kill query // kill query
...@@ -2564,7 +2564,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2564,7 +2564,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
} }
if (pParentCmd->pTableMetaMap == NULL) { if (pParentCmd->pTableMetaMap == NULL) {
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
} }
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
...@@ -2810,7 +2810,7 @@ static void createHbObj(STscObj* pObj) { ...@@ -2810,7 +2810,7 @@ static void createHbObj(STscObj* pObj) {
return; return;
} }
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) return; if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp; pSql->fp = tscProcessHeartBeatRsp;
...@@ -3083,7 +3083,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -3083,7 +3083,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self); tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3149,7 +3149,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -3149,7 +3149,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
} }
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) { int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3286,7 +3286,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create ...@@ -3286,7 +3286,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
} }
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql); tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3391,15 +3391,16 @@ int tscRenewTableMeta(SSqlObj *pSql) { ...@@ -3391,15 +3391,16 @@ int tscRenewTableMeta(SSqlObj *pSql) {
tscResetSqlCmd(pCmd, true, pSql->self); tscResetSqlCmd(pCmd, true, pSql->self);
SSqlCmd* pCmd2 = &pSql->rootObj->cmd; SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap); SHashObj *pmap = pCmd2->pTableMetaMap;
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pmap == atomic_val_compare_exchange_ptr(&pCmd2->pTableMetaMap, pmap, NULL)) {
tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
}
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pSql->rootObj->retryReason = pSql->retryReason; pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *rootSql = pSql->rootObj; SSqlObj *rootSql = pSql->rootObj;
pthread_mutex_lock(&rootSql->mtxSubs);
tscFreeSubobj(rootSql); tscFreeSubobj(rootSql);
pthread_mutex_unlock(&rootSql->mtxSubs);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
...@@ -3426,7 +3427,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { ...@@ -3426,7 +3427,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
if (allVgroupInfoRetrieved(pQueryInfo)) { if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
......
...@@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) { if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj); tscReleaseRpc(pRpcObj);
...@@ -164,8 +164,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -164,8 +164,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pSql->param = param; pSql->param = param;
pSql->cmd.command = TSDB_SQL_CONNECT; pSql->cmd.command = TSDB_SQL_CONNECT;
tsem_init(&pSql->rspSem, 0, 0);
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj); tscReleaseRpc(pRpcObj);
...@@ -305,10 +303,6 @@ void taos_close(TAOS *taos) { ...@@ -305,10 +303,6 @@ void taos_close(TAOS *taos) {
tscDebug("0x%"PRIx64" HB is freed", pHb->self); tscDebug("0x%"PRIx64" HB is freed", pHb->self);
taosReleaseRef(tscObjRef, pHb->self); taosReleaseRef(tscObjRef, pHb->self);
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pHb->rspSem, 0, 0);
#endif // __APPLE__
taos_free_result(pHb); taos_free_result(pHb);
} }
} }
...@@ -363,14 +357,13 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t* ...@@ -363,14 +357,13 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t*
nPrintTsc("%s", sqlstr); nPrintTsc("%s", sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
tsem_init(&pSql->rspSem, 0, 0);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
if (res != NULL) { if (res != NULL) {
...@@ -754,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -754,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int i = 0; i < pSql->subState.numOfSub; ++i) { for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here // NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i]; SSqlObj *pSub = pSql->pSubs[i];
...@@ -774,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -774,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
// taosRelekaseRef(tscObjRef, pSubObj->self); // taosRelekaseRef(tscObjRef, pSubObj->self);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) { if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -1047,7 +1044,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -1047,7 +1044,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED;
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
...@@ -1165,7 +1162,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1165,7 +1162,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
tscAllocPayload(&pSql->cmd, 1024); tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos; pSql->pTscObj = taos;
...@@ -1182,7 +1179,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1182,7 +1179,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code; return code;
} }
pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj); tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
......
...@@ -1002,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t ...@@ -1002,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t
return NULL; return NULL;
} }
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
...@@ -1054,7 +1054,6 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t ...@@ -1054,7 +1054,6 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t
pSql->fetchFp = tscCreateStream; pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID; pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......
...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
} }
} }
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSub *pSub = ((SSub *)param); SSub *pSub = ((SSub *)param);
...@@ -117,7 +116,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -117,7 +116,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail; goto fail;
} }
pSql = calloc(1, sizeof(SSqlObj)); pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
line = __LINE__; line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -132,11 +131,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -132,11 +131,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
line = __LINE__;
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
pSql->param = pSub; pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
...@@ -432,7 +426,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char ...@@ -432,7 +426,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
} }
SSqlObj* recreateSqlObj(SSub* pSub) { SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
...@@ -442,10 +436,6 @@ SSqlObj* recreateSqlObj(SSub* pSub) { ...@@ -442,10 +436,6 @@ SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
tscFreeSqlObj(pSql);
return NULL;
}
pSql->param = pSub; pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
......
此差异已折叠。
...@@ -1647,7 +1647,10 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { ...@@ -1647,7 +1647,10 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->insertParam.tagData.dataLen = 0; pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta, id); tscFreeQueryInfo(pCmd, clearCachedMeta, id);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); SHashObj *pmap = pCmd->pTableMetaMap;
if (pmap == atomic_val_compare_exchange_ptr(&pCmd->pTableMetaMap, pmap, NULL)) {
tscCleanupTableMetaMap(pCmd->pTableMetaMap);
}
taosReleaseRef(tscObjRef, id); taosReleaseRef(tscObjRef, id);
} }
...@@ -1678,33 +1681,30 @@ void tscFreeSqlResult(SSqlObj* pSql) { ...@@ -1678,33 +1681,30 @@ void tscFreeSqlResult(SSqlObj* pSql) {
} }
void tscFreeSubobj(SSqlObj* pSql) { void tscFreeSubobj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) { if (pSql->subState.numOfSub == 0) {
return; goto _out;
} }
pSql->subState.version ++;
pthread_mutex_lock(&pSql->subState.mutex);
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) { if (!pSql->pSubs[i]) {
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
} else {
/* just for python error test case */
tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i); tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i);
continue;
} }
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
taos_free_result(pSql->pSubs[i]); taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
tfree(pSql->pSubs);
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
tfree(pSql->pSubs); _out:
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
} }
/** /**
...@@ -1741,6 +1741,52 @@ void tscFreeMetaSqlObj(int64_t *rid){ ...@@ -1741,6 +1741,52 @@ void tscFreeMetaSqlObj(int64_t *rid){
} }
} }
SSqlObj* tscAllocSqlObj() {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (!pNew) {
return NULL;
}
int rc = tsem_init(&pNew->rspSem, 0, 0);
assert(rc == 0 && "tsem_init failure");
rc = pthread_mutex_init(&pNew->subState.mutex, NULL);
assert(rc == 0 && "pthread_mutex_init failure");
return pNew;
}
SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx, uint32_t stateVersion) {
assert (pSql != NULL);
SSqlObj *pSub = NULL;
{ pthread_mutex_lock(&pSql->subState.mutex);
if (stateVersion != tscGetVersionOfSubStateWithoutLock(pSql) ||
idx < 0 ||
idx >= pSql->subState.numOfSub ||
!pSql->pSubs[idx]) {
goto _out;
}
pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self);
assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch");
_out:
pthread_mutex_unlock(&pSql->subState.mutex); }
return pSub;
}
void tscReleaseRefOfSubobj(SSqlObj* pSub) {
assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted");
taosReleaseRef(tscObjRef, pSub->self);
}
uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql) {
return pSql->subState.version;
}
void tscResetAllSubStates(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
memset(pSql->subState.states, 0, sizeof(pSql->subState.states[0]) * pSql->subState.numOfSub);
pthread_mutex_unlock(&pSql->subState.mutex);
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
...@@ -1764,14 +1810,11 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1764,14 +1810,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
if (pSql && (pSql == pSql->rootObj)) {
pthread_mutex_destroy(&pSql->mtxSubs);
}
pSql->signature = NULL; pSql->signature = NULL;
pSql->fp = NULL; pSql->fp = NULL;
tfree(pSql->sqlstr); tfree(pSql->sqlstr);
tfree(pSql->pBuf); tfree(pSql->pBuf);
pSql->self = 0; pSql->self = 0;
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -1782,6 +1825,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1782,6 +1825,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql); tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
pthread_mutex_destroy(&pSql->subState.mutex);
tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql)); memset(pSql, 0, sizeof(*pSql));
free(pSql); free(pSql);
...@@ -3889,7 +3933,7 @@ void registerSqlObj(SSqlObj* pSql) { ...@@ -3889,7 +3933,7 @@ void registerSqlObj(SSqlObj* pSql) {
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0); tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0);
return NULL; return NULL;
...@@ -3901,7 +3945,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -3901,7 +3945,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd; pCmd->command = cmd;
tsem_init(&pNew->rspSem, 0 ,0);
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
...@@ -3966,7 +4009,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui ...@@ -3966,7 +4009,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) { SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex); tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3980,7 +4023,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3980,7 +4023,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj; pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
...@@ -4305,26 +4347,25 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4305,26 +4347,25 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
//bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail.
//assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
int32_t code = TSDB_CODE_SUCCESS;
pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); { pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t));
if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.numOfSub = numOfSubqueries;
pSql->subState.version ++;
return TSDB_CODE_SUCCESS; pthread_mutex_unlock(&pSql->subState.mutex); }
return code;
} }
// do execute the query according to the query execution plan // do execute the query according to the query execution plan
...@@ -4349,21 +4390,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4349,21 +4390,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
// upstream may be freed before retry // upstream may be freed before retry
if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); code = doReInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
uint32_t stateVersion = 0;
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
pSql->cmd.active = pSub; pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; errflag = 1;
break;
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
...@@ -4376,24 +4422,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4376,24 +4422,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->cmd.resColumnId = TSDB_RES_COL_ID; pNew->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
if (ps == NULL) { if (ps == NULL) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
goto _error; errflag = 1;
break;
} }
ps->pParentSql = pSql; ps->pParentSql = pSql;
ps->subqueryIndex = i; ps->subqueryIndex = i;
pNew->param = ps; pNew->param = ps;
registerSqlObj(pNew);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
goto _error; errflag = 1;
break;
} }
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
...@@ -4403,9 +4451,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4403,9 +4451,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
numOfInit++; numOfInit++;
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
goto _error;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = pSql->pSubs[i]; SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
registerSqlObj(psub); if (!psub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
code = TSDB_CODE_FAILED;
goto _error;
}
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
...@@ -4415,30 +4477,21 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4415,30 +4477,21 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} }
executeQuery(psub, pq); executeQuery(psub, pq);
tscReleaseRefOfSubobj(psub); // REL ref
} }
return; return;
} else if (hasMoreClauseToTry(pSql)) {
if (pthread_mutex_init(&pSql->subState.mutex, NULL) != 0) {
goto _error;
}
} }
pSql->cmd.active = pQueryInfo; pSql->cmd.active = pQueryInfo;
doExecuteQuery(pSql, pQueryInfo); doExecuteQuery(pSql, pQueryInfo);
return; return;
_error: _error:
for(int32_t i = 0; i < numOfInit; ++i) {
SSqlObj* p = pSql->pSubs[i];
tscFreeSqlObj(p);
}
pSql->res.code = code; pSql->res.code = code;
pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed tscFreeSubobj(pSql);
tfree(pSql->subState.states);
tfree(pSql->pSubs);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -4682,7 +4735,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -4682,7 +4735,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* *
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed. * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/ */
pSql->subState.numOfSub = 0; tscFreeSubobj(pSql);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
......
...@@ -67,6 +67,8 @@ extern int32_t tsCompressColData; ...@@ -67,6 +67,8 @@ extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults; extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[]; extern char tsTempDir[];
extern int32_t tsShortcutFlag; extern int32_t tsShortcutFlag;
extern int32_t tsMaxSqlGroups;
extern int8_t tsSortWhenGroupBy;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......
...@@ -118,6 +118,12 @@ int32_t tsRetryStreamCompDelay = 30 * 60 * 1000; ...@@ -118,6 +118,12 @@ int32_t tsRetryStreamCompDelay = 30 * 60 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default. // The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f; float tsStreamComputDelayRatio = 0.1f;
// max supported groups for group by clause / interval clause
int32_t tsMaxSqlGroups = 1000000;
// order by first group by column when group by
int8_t tsSortWhenGroupBy = 1;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
...@@ -1782,6 +1788,26 @@ static void doInitGlobalConfig(void) { ...@@ -1782,6 +1788,26 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "maxSqlGroups";
cfg.ptr = &tsMaxSqlGroups;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 500000;
cfg.maxValue = 10000000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "sortWhenGroupBy";
cfg.ptr = &tsSortWhenGroupBy;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -621,7 +621,7 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult ...@@ -621,7 +621,7 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
} }
// too many time window in query // too many time window in query
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { if (pResultRowInfo->size > tsMaxSqlGroups) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -1925,7 +1925,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo* pIn ...@@ -1925,7 +1925,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo* pIn
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
char* key = NULL; char* key = NULL;
int16_t num = 0; int32_t num = 0;
int32_t type = 0; int32_t type = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key);
...@@ -7607,7 +7607,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -7607,7 +7607,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
} }
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);
if (!pRuntimeEnv->pQueryAttr->stableQuery) { if (!pRuntimeEnv->pQueryAttr->stableQuery && tsSortWhenGroupBy) {
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx); sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx);
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 138 #define TSDB_CFG_MAX_NUM 140
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid) ...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid)
return taosDecRefCount(rsetId, rid, 1); return taosDecRefCount(rsetId, rid, 1);
} }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid) void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash; int hash;
......
...@@ -96,6 +96,23 @@ class TDTestCase: ...@@ -96,6 +96,23 @@ class TDTestCase:
tdSql.execute("drop stable if exists db.st") tdSql.execute("drop stable if exists db.st")
tdSql.execute("create table stb(ts timestamp, c1 int) tags(t1 int)") tdSql.execute("create table stb(ts timestamp, c1 int) tags(t1 int)")
tdSql.error("create table `` using stb tags(1)") tdSql.error("create table `` using stb tags(1)")
# TS-1760
sql = "create table stb9 (ts timestamp"
for i in range(999):
sql += ", longcolumntest%d double" % i
sql += ") tags(t1 int)"
tdSql.execute(sql)
tdSql.query("describe stb9")
tdSql.checkRows(1001)
tdSql.query("show create table stb9")
query = tdSql.getData(0, 1)
tdSql.execute("drop table if exists stb9")
tdSql.execute(query)
tdSql.query("describe stb9")
tdSql.checkRows(1001)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册