提交 b949b047 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

......@@ -349,7 +349,7 @@ typedef struct SSqlStream {
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
int tsParseSql(SSqlObj *pSql, bool initialParse);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
int tscProcessSql(SSqlObj *pSql);
......
......@@ -39,41 +39,26 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
int doAsyncParseSql(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to malloc payload");
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return code;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
return tsParseSql(pSql, true);
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return;
}
strtolower(pSql->sqlstr, sqlstr);
int32_t code = doAsyncParseSql(pSql);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
pSql->cmd.curSql = pSql->sqlstr;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1014,42 +1014,37 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
* @param pSql
* @return
*/
int doParseInsertSql(SSqlObj *pSql, char *str) {
int tsParseInsertSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
char* str = pCmd->curSql;
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
STableMetaInfo *pTableMetaInfo = NULL;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
} else {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
}
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536
// but TSDB_PAYLOAD_SIZE is 65380
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) {
return code;
}
assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList))
|| ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)));
if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) {
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error_clean;
goto _error;
}
} else {
assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList));
str = pCmd->curSql;
}
......@@ -1075,7 +1070,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/
if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL;
goto _error_clean;
goto _error;
} else {
break;
}
......@@ -1086,11 +1081,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
// Check if the table name available or not
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
goto _error_clean;
goto _error;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
ptrdiff_t pos = pCmd->curSql - pSql->sqlstr;
......@@ -1103,20 +1098,19 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* interrupted position.
*/
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql,
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 ", %s", pSql,
pos, pCmd->curSql);
return code;
}
// todo add to return
tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code));
tscError("%p async insert parse error, code:%d, reason:%s", pSql, code, tstrerror(code));
pCmd->curSql = NULL;
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
goto _error;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1125,7 +1119,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error_clean;
goto _error;
}
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -1137,7 +1131,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
/*
......@@ -1146,11 +1140,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1158,7 +1152,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error_clean;
goto _error;
}
char fname[PATH_MAX] = {0};
......@@ -1168,7 +1162,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
wordexp_t full_path;
if (wordexp(fname, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error_clean;
goto _error;
}
strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path);
......@@ -1179,7 +1173,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
pTableMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
......@@ -1190,7 +1184,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
SParsedDataColInfo spd = {0};
......@@ -1226,7 +1220,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _error_clean;
goto _error;
}
spd.hasVal[t] = true;
......@@ -1237,13 +1231,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _error_clean;
goto _error;
}
}
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1252,16 +1246,16 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _error_clean;
goto _error;
}
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
goto _error_clean;
goto _error;
}
}
......@@ -1272,7 +1266,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
......@@ -1281,7 +1275,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
code = TSDB_CODE_SUCCESS;
goto _clean;
_error_clean:
_error:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean:
......@@ -1294,7 +1288,7 @@ _clean:
return code;
}
int tsParseInsertSql(SSqlObj *pSql) {
int tsInsertInitialCheck(SSqlObj *pSql) {
if (!pSql->pTscObj->writeAuth) {
return TSDB_CODE_TSC_NO_WRITE_AUTH;
}
......@@ -1312,30 +1306,23 @@ int tsParseInsertSql(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
}
return doParseInsertSql(pSql, pSql->sqlstr + index);
pCmd->curSql = sToken.z + sToken.n;
return TSDB_CODE_SUCCESS;
}
int tsParseSql(SSqlObj *pSql, bool initialParse) {
int32_t ret = TSDB_CODE_SUCCESS;
if (initialParse) {
assert(!pSql->cmd.parseFinished);
SSqlCmd* pCmd = &pSql->cmd;
char* p = pSql->sqlstr;
pSql->sqlstr = NULL;
tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p;
} else if (!pSql->cmd.parseFinished) {
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
if (!pCmd->parseFinished) {
tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql);
}
if (tscIsInsertData(pSql->sqlstr)) {
......@@ -1347,7 +1334,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
if (initialParse && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
return ret;
}
ret = tsParseInsertSql(pSql);
} else {
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
......@@ -1362,11 +1353,9 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
/*
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function
* invokes new threads to get data from mnode or simply retrieves data from cache.
*
* do NOT assign return code to pRes->code for the same reason since it may be released by another thread
* pRes->code = ret;
* so do NOT use pRes->code to determine if the getTableMeta function
* invokes new threads to get data from mgmt node or simply retrieves data from cache.
* do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
*/
return ret;
}
......
......@@ -1690,7 +1690,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pItem->pNode->pParam->nExpr > 1 && strlen(pItem->aliasName) > 0) {
if (pItem->pNode->pParam->nExpr > 1 && (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
}
......@@ -1761,7 +1761,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
int32_t numOfFields = 0;
// multicolumn selection does not support alias name
if (strlen(pItem->aliasName) != 0) {
if (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
}
......@@ -2642,7 +2642,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType, false);
size_t len = wcslen((wchar_t*)pColumnFilter->pz);
size_t len = twcslen((wchar_t*)pColumnFilter->pz);
pColumnFilter->len = len * TSDB_NCHAR_SIZE;
} else {
tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType, false);
......
......@@ -488,7 +488,7 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p start to send msg to free qhandle in dnode, command:%s", pSql, sqlCmd[pCmd->command]);
tscTrace("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]);
pSql->freed = 1;
tscProcessSql(pSql);
......@@ -510,18 +510,17 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
void taos_free_result(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
tscTrace("%p start to free result", res);
if (pSql == NULL || pSql->signature != pSql) {
tscTrace("%p result has been freed", pSql);
tscTrace("%p sqlObj has been freed", pSql);
return;
}
// The semaphore can not be changed while freeing async sub query objects.
SSqlRes *pRes = &pSql->res;
if (pRes == NULL || pRes->qhandle == 0) {
tscTrace("%p SqlObj is freed by app, qhandle is null", pSql);
tscFreeSqlObj(pSql);
tscTrace("%p SqlObj is freed by app, qhandle is null", pSql);
return;
}
......@@ -529,6 +528,7 @@ void taos_free_result(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscFreeSqlObj(pSql);
tscTrace("%p SqlObj is freed by app", pSql);
return;
}
......@@ -713,7 +713,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
......
......@@ -503,8 +503,10 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
int32_t code = doAsyncParseSql(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
}
......
......@@ -1165,8 +1165,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
assert(pSql->res.numOfRows == 0);
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
if (pSql->pSubs == NULL) {
......@@ -1364,7 +1364,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check
pRes->qhandle = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 16); // 64KB
......@@ -1845,34 +1845,41 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
} else {
// record the total inserted rows
if (numOfRows > 0 && tres != pParentObj) {
pParentObj->res.numOfRows += numOfRows;
}
if (taos_errno(tres) != 0) {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
}
taos_free_result(tres);
// it is not the initial sqlObj, free it
if (tres != pParentObj) {
taos_free_result(tres);
} else {
assert(pParentObj->pSubs[0] == tres);
}
tfree(pSupporter);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
return;
}
tscTrace("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows);
tfree(pState);
tfree(pSupporter);
// release data block data
tfree(pState);
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
......@@ -1886,9 +1893,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->qhandle = 1; // hack the qhandle check
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
......@@ -1896,52 +1901,84 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pState->numOfTotal;
pState->numOfRemain = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pSql->numOfSubs; ++i) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
pSql->fp = multiVnodeInsertFinalize;
pSql->param = pSupporter;
pSql->pSubs[0] = pSql; // the first sub insert points back to itself
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0);
int32_t numOfSub = 1;
int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0,
pDataBlocks->nSize, code);
goto _error;
}
for (; numOfSub < pSql->numOfSubs; ++numOfSub) {
SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter));
pSupporter1->pSql = pSql;
pSupporter1->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
goto _error;
}
/*
* assign the callback function to fetchFp to make sure that the error process function can restore
* the callback function (multiVnodeInsertMerge) correctly.
* the callback function (multiVnodeInsertFinalize) correctly.
*/
pNew->fetchFp = pNew->fp;
pSql->pSubs[i] = pNew;
pSql->pSubs[numOfSub] = pNew;
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, numOfSub,
pDataBlocks->nSize, code);
goto _error;
} else {
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
}
}
if (i < pSql->numOfSubs) {
if (numOfSub < pSql->numOfSubs) {
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
// use the local variable
for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j,
pDataBlocks->nSize, code);
}
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
_error:
// restore the udf fp
pSql->fp = pSql->fetchFp;
pSql->pSubs[0] = NULL;
tfree(pState);
tfree(pSql->param);
for(int32_t j = 1; j < numOfSub; ++j) {
tfree(pSql->pSubs[j]->param);
taos_free_result(pSql->pSubs[j]);
}
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
......
......@@ -184,7 +184,7 @@ int32_t tVariantToString(tVariant *pVar, char *dst) {
case TSDB_DATA_TYPE_NCHAR: {
dst[0] = '\'';
taosUcs4ToMbs(pVar->wpz, (wcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
int32_t len = strlen(dst);
dst[len] = '\'';
dst[len + 1] = 0;
......@@ -416,7 +416,7 @@ static int32_t toNchar(tVariant *pVariant, char **pDest, int32_t *pDestSize) {
}
pVariant->wpz = pWStr;
*pDestSize = wcslen(pVariant->wpz);
*pDestSize = twcslen(pVariant->wpz);
// shrink the allocate memory, no need to check here.
char* tmp = realloc(pVariant->wpz, (*pDestSize + 1)*TSDB_NCHAR_SIZE);
......
......@@ -66,7 +66,7 @@ static void _init_tvariant_nchar(tVariant* t) {
t->wpz = (wchar_t*)calloc(1, 20 * TSDB_NCHAR_SIZE);
t->nType = TSDB_DATA_TYPE_NCHAR;
wcscpy(t->wpz, L"-2000000.8765");
t->nLen = wcslen(t->wpz);
t->nLen = twcslen(t->wpz);
}
int main(int argc, char** argv) {
......
......@@ -119,6 +119,8 @@ extern "C" {
uint32_t taosRand(void);
size_t twcslen(const wchar_t *wcs);
int32_t strdequote(char *src);
size_t strtrim(char *src);
......
......@@ -57,6 +57,27 @@ uint32_t taosRand(void)
}
#endif
size_t twcslen(const wchar_t *wcs) {
#ifdef WINDOWS
int *wstr = (int *)wcs;
if (NULL == wstr) {
return 0;
}
size_t n = 0;
while (1) {
if (0 == *wstr++) {
break;
}
n++;
}
return n;
#else
return wcslen(wcs);
#endif
}
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
print("==============Case 1: add column, restart taosd, drop the same colum then add it back")
tdSql.execute(
"create table st(ts timestamp, speed int) tags(loc nchar(20))")
tdSql.execute(
"insert into t1 using st tags('beijing') values(now, 1)")
tdSql.execute(
"alter table st add column tbcol binary(20)")
# restart taosd
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdSql.execute(
"alter table st drop column tbcol")
tdSql.execute(
"alter table st add column tbcol binary(20)")
tdSql.query("select * from st")
tdSql.checkRows(1)
print("==============Case 2: keep adding columns, restart taosd")
tdSql.execute(
"create table dt(ts timestamp, tbcol1 tinyint) tags(tgcol1 tinyint)")
tdSql.execute(
"alter table dt add column tbcol2 int")
tdSql.execute(
"alter table dt add column tbcol3 smallint")
tdSql.execute(
"alter table dt add column tbcol4 bigint")
tdSql.execute(
"alter table dt add column tbcol5 float")
tdSql.execute(
"alter table dt add column tbcol6 double")
tdSql.execute(
"alter table dt add column tbcol7 bool")
tdSql.execute(
"alter table dt add column tbcol8 nchar(20)")
tdSql.execute(
"alter table dt add column tbcol9 binary(20)")
# restart taosd
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdSql.query("select * from st")
tdSql.checkRows(0)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -126,6 +126,9 @@ python3 ./test.py -f import_merge/importInsertThenImport.py
python3 ./test.py -f user/user_create.py
python3 ./test.py -f user/pass_len.py
# stable
python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
......@@ -138,7 +141,11 @@ python3 ./test.py -f query/filterAllIntTypes.py
python3 ./test.py -f query/filterFloatAndDouble.py
python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/querySort.py
python3 ./test.py -f query/queryJoin.py
#stream
python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py
#alter table
python3 ./test.py -f alter/alter_table_crash.py
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import random
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class Test:
def __init__(self):
self.current_tb = ""
self.last_tb = ""
self.written = 0
def create_table(self):
tdLog.info("create a table")
self.current_tb = "tb%d" % int(round(time.time() * 1000))
tdLog.info("current table %s" % self.current_tb)
if (self.current_tb == self.last_tb):
return
else:
tdSql.execute(
'create table %s (ts timestamp, speed int)' %
self.current_tb)
self.last_tb = self.current_tb
self.written = 0
def insert_data(self):
tdLog.info("will insert data to table")
if (self.current_tb == ""):
tdLog.info("no table, create first")
self.create_table()
tdLog.info("insert data to table")
insertRows = 10
tdLog.info("insert %d rows to %s" % (insertRows, self.last_tb))
for i in range(0, insertRows):
ret = tdSql.execute(
'insert into %s values (now + %dm, %d)' %
(self.last_tb, i, i))
self.written = self.written + 1
tdLog.info("insert earlier data")
tdSql.execute('insert into %s values (now - 5m , 10)' % self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 6m , 10)' % self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 7m , 10)' % self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 8m , 10)' % self.last_tb)
self.written = self.written + 1
def query_data(self):
if (self.written > 0):
tdLog.info("query data from table")
tdSql.query("select * from %s" % self.last_tb)
tdSql.checkRows(self.written)
def query_stable(self):
tdLog.info("query super table")
tdSql.query("select * from st")
tdSql.checkRows(1)
def create_stable(self):
tdLog.info("create a super table and sub-table and insert data")
tdSql.execute(
"create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))")
tdSql.execute(
'CREATE TABLE if not exists dev_001 using st tags("dev_01")')
tdSql.execute(
"INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1)")
def stop_database(self):
tdLog.info("stop databae")
tdDnodes.stop(1)
def restart_database(self):
tdLog.info("restart databae")
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.sleep(5)
def force_restart(self):
tdLog.info("force restart database")
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdLog.sleep(5)
def drop_table(self):
if (self.current_tb != ""):
tdLog.info("drop current tb %s" % self.current_tb)
tdSql.execute("drop table %s" % self.current_tb)
self.current_tb = ""
self.last_tb = ""
self.written = 0
def reset_query_cache(self):
tdLog.info("reset query cache")
tdSql.execute("reset query cache")
tdLog.sleep(1)
def reset_database(self):
tdLog.info("reset database")
tdDnodes.forcestop(1)
tdDnodes.deploy(1)
self.current_tb = ""
self.last_tb = ""
self.written = 0
tdDnodes.start(1)
tdSql.prepare()
def delete_datafiles(self):
tdLog.info("delete data files")
dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
self.current_tb = ""
self.last_tb = ""
self.written = 0
tdDnodes.start(1)
tdSql.prepare()
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
test = Test()
switch = {
1: test.create_table,
2: test.insert_data,
3: test.query_data,
4: test.create_stable,
5: test.restart_database,
6: test.force_restart,
7: test.drop_table,
8: test.reset_query_cache,
9: test.reset_database,
10: test.delete_datafiles,
11: test.query_stable,
12: test.stop_database,
}
switch.get(4, lambda: "ERROR")()
switch.get(12, lambda: "ERROR")()
switch.get(10, lambda: "ERROR")()
switch.get(5, lambda: "ERROR")()
switch.get(11, lambda: "ERROR")()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册