提交 40016c5e 编写于 作者: H Haojun Liao

[TD-2361]<enhance>: optimize the failure retry in insert processing.

上级 94d96109
...@@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); ...@@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql); int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child);
void tscBuildResFromSubqueries(SSqlObj *pSql); void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql); TAOS_ROW doSetResultRowData(SSqlObj *pSql);
......
...@@ -37,40 +37,6 @@ extern "C" { ...@@ -37,40 +37,6 @@ extern "C" {
#include "qTsbuf.h" #include "qTsbuf.h"
#include "tcmdtype.h" #include "tcmdtype.h"
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return malloc(__size);
}
}
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return calloc(num, __size);
}
}
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
uint32_t v = rand();
if (v % 5000 <= 0) {
return NULL;
} else {
return realloc(p, __size);
}
}
#define calloc u_calloc
#define malloc u_malloc
#define realloc u_realloc
#endif
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
struct SLocalReducer; struct SLocalReducer;
...@@ -78,7 +44,7 @@ struct SLocalReducer; ...@@ -78,7 +44,7 @@ struct SLocalReducer;
// data source from sql string or from file // data source from sql string or from file
enum { enum {
DATA_FROM_SQL_STRING = 1, DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2, DATA_FROM_DATA_FILE = 2,
}; };
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
...@@ -118,10 +84,10 @@ typedef struct STableMetaInfo { ...@@ -118,10 +84,10 @@ typedef struct STableMetaInfo {
* 1. keep the vgroup index during the multi-vnode super table projection query * 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion * 2. keep the vgroup index for multi-vnode insertion
*/ */
int32_t vgroupIndex; int32_t vgroupIndex;
char name[TSDB_TABLE_FNAME_LEN]; // (super) table name char name[TSDB_TABLE_FNAME_LEN]; // (super) table name
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray* tagColList; // SArray<SColumn*>, involved tag columns SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo; } STableMetaInfo;
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
...@@ -204,22 +170,17 @@ typedef struct SParamInfo { ...@@ -204,22 +170,17 @@ typedef struct SParamInfo {
} SParamInfo; } SParamInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_FNAME_LEN]; char tableId[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table int32_t rowSize; // row size for current table
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t headerSize; // header for table info (uid, tid, submit metadata) uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size; uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
/*
* the table meta of table, the table meta will be used during submit, keep a ref
* to avoid it to be removed from cache
*/
STableMeta *pTableMeta;
char *pData; char *pData;
// for parameter ('?') binding // for parameter ('?') binding
...@@ -284,10 +245,14 @@ typedef struct { ...@@ -284,10 +245,14 @@ typedef struct {
int32_t numOfParams; int32_t numOfParams;
int8_t dataSourceType; // load data from file or not int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema int8_t submitSchema; // submit block is built with table schema
STagData *pTagData; // NOTE: pTagData->data is used as a variant length array STagData *pTagData; // NOTE: pTagData->data is used as a variant length array
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
} SSqlCmd; } SSqlCmd;
typedef struct SResRec { typedef struct SResRec {
......
...@@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error; goto _error;
} else {
tscDebug("%p get %s successfully", pSql, msg);
} }
tscDebug("%p get %s successfully", pSql, msg);
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine // check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql); tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
// param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) {
return;
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSql;
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// tscProcessSql can add error into async res
tscProcessSql(pSql);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else {
assert(code == TSDB_CODE_SUCCESS);
} }
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
// tscProcessSql can add error into async res // tscProcessSql can add error into async res
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
...@@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else {
assert(code == TSDB_CODE_SUCCESS);
} }
assert(pCmd->command != TSDB_SQL_INSERT);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema. // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql); tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false; pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd, false); tscResetSqlCmdObj(pCmd, false);
...@@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error; goto _error;
} }
if (pCmd->command == TSDB_SQL_INSERT) { tscProcessSql(pSql);
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
tscHandleInsertRetry(pSql);
} else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue
tscProcessSql(pSql);
}
}else { // in all other cases, simple retry }else { // in all other cases, simple retry
tscProcessSql(pSql); tscProcessSql(pSql);
} }
...@@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
return; return;
......
...@@ -728,7 +728,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -728,7 +728,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSchema p1 = {0}; SSchema p1 = {0};
if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = tGetTableNameColumnSchema(); p1 = tGetTableNameColumnSchema();
} else if (pExpr->colInfo.colIndex == TSDB_UD_COLUMN_INDEX) { } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
p1.bytes = pExpr->resBytes; p1.bytes = pExpr->resBytes;
p1.type = pExpr->resType; p1.type = pExpr->resType;
tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name)); tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name));
......
...@@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { ...@@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
} }
} }
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd, static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) {
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pCmd->pTableList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -1060,16 +1057,15 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1060,16 +1057,15 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (NULL == pCmd->pTableList) { if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); if (NULL == pCmd->pTableList) {
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _clean;
} }
} else { } else {
str = pCmd->curSql; str = pCmd->curSql;
} }
tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList); tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableList);
while (1) { while (1) {
int32_t index = 0; int32_t index = 0;
...@@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
*/ */
if (totalNum == 0) { if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL; code = TSDB_CODE_TSC_INVALID_SQL;
goto _error; goto _clean;
} else { } else {
break; break;
} }
...@@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) {
// Check if the table name available or not // Check if the table name available or not
if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) { if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
goto _error; goto _clean;
} }
if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
...@@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscError("%p async insert parse error, code:%s", pSql, tstrerror(code)); tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
pCmd->curSql = NULL; pCmd->curSql = NULL;
goto _error; goto _clean;
} }
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error; goto _clean;
} }
index = 0; index = 0;
...@@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.n == 0) { if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error; goto _clean;
} }
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
...@@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
/* /*
* app here insert data in different vnodes, so we need to set the following * app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines * data in another submit procedure using async insert routines
*/ */
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} else if (sToken.type == TK_FILE) { } else if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL); sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) { if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error; goto _clean;
} }
str += index; str += index;
if (sToken.n == 0) { if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error; goto _clean;
} }
strncpy(pCmd->payload, sToken.z, sToken.n); strncpy(pCmd->payload, sToken.z, sToken.n);
...@@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
wordexp_t full_path; wordexp_t full_path;
if (wordexp(pCmd->payload, &full_path, 0) != 0) { if (wordexp(pCmd->payload, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error; goto _clean;
} }
tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
...@@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
SSchema * pSchema = tscGetTableSchema(pTableMeta); SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
SParsedDataColInfo spd = {0}; SParsedDataColInfo spd = {0};
...@@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (spd.hasVal[t] == true) { if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _error; goto _clean;
} }
spd.hasVal[t] = true; spd.hasVal[t] = true;
...@@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (!findColumnIndex) { if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _error; goto _clean;
} }
} }
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
goto _error; goto _clean;
} }
index = 0; index = 0;
...@@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (sToken.type != TK_VALUES) { if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _error; goto _clean;
} }
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); code = doParseInsertStatement(pCmd, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} else { } else {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
goto _error; goto _clean;
} }
} }
...@@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean; goto _clean;
} }
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId if (taosHashGetSize(pCmd->pTableList) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
goto _error; goto _clean;
} }
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _clean; goto _clean;
_error:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean: _clean:
taosHashCleanup(pCmd->pTableList); pCmd->curSql = NULL;
pCmd->pTableList = NULL;
pCmd->curSql = NULL;
pCmd->parseFinished = 1; pCmd->parseFinished = 1;
return code; return code;
} }
...@@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
pSql->parseRetry++; pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
} }
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
} }
...@@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL); return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
} }
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { if (taosHashGetSize(pCmd->pTableList) > 0) {
// merge according to vgid // merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); int code = tscMergeTableDataBlocks(stmt->pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -280,19 +280,18 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -280,19 +280,18 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
......
...@@ -2163,23 +2163,76 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2163,23 +2163,76 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
assert(pSql != NULL && pSql->res.code == numOfRows); assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code; pParentObj->res.code = pSql->res.code;
}
tfree(pSupporter); // set the flag in the parent sqlObj
if (pSql->cmd.submitSchema) {
pParentObj->cmd.submitSchema = 1;
}
}
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
return; return;
} }
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
int32_t numOfSub = pParentObj->subState.numOfSub;
if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
tfree(pSql->param);
}
// todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
int32_t numOfFailed = 0;
for(int32_t i = 0; i < numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
if (pSql->res.code != TSDB_CODE_SUCCESS) {
numOfFailed += 1;
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, true);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
}
}
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
pParentObj->res.numOfRows, numOfFailed, numOfSub);
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true);
}
pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
pParentObj->subState.numOfSub = numOfFailed;
// todo remove this parameter in async callback function definition. tscResetSqlCmdObj(&pParentObj->cmd, false);
// all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; tscDebug("%p re-parse sql to generate data", pParentObj);
(*pParentObj->fp)(pParentObj->param, pParentObj, v); int32_t code = tsParseSql(pParentObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code;
tscQueueAsyncRes(pParentObj);
return;
}
tscDoQuery(pParentObj);
}
} }
/** /**
...@@ -2187,19 +2240,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2187,19 +2240,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
* @param pSql * @param pSql
* @return * @return
*/ */
int32_t tscHandleInsertRetry(SSqlObj* pSql) { int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
assert(pSql != NULL && pSql->param != NULL); assert(pSql != NULL && pSql->param != NULL);
SSqlCmd* pCmd = &pSql->cmd; // SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
// free the data block created from insert sql string // free the data block created from insert sql string
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); // pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
...@@ -2213,6 +2266,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2213,6 +2266,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
// it is the failure retry insert
if (pSql->pSubs != NULL) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i);
if (pSub->res.code != TSDB_CODE_SUCCESS) {
tscHandleInsertRetry(pSql, pSub);
}
}
return TSDB_CODE_SUCCESS;
}
pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks);
assert(pSql->subState.numOfSub > 0); assert(pSql->subState.numOfSub > 0);
......
...@@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
} else { } else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst)); int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length); varDataSetLen(dst, length);
if (length == 0) { if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
} }
} else {
varDataSetLen(dst, 0);
} }
p += pInfo->field.bytes; p += pInfo->field.bytes;
...@@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return; return;
} }
...@@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { ...@@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->msgType = 0; pCmd->msgType = 0;
pCmd->parseFinished = 0; pCmd->parseFinished = 0;
pCmd->autoCreated = 0; pCmd->autoCreated = 0;
pCmd->numOfTables = 0;
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL; tfree(pCmd->pTableMetaList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd, removeFromCache); tscFreeQueryInfo(pCmd, removeFromCache);
} }
...@@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { ...@@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
return NULL; return NULL;
} }
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) {
if (pBlockHashTable == NULL) {
return NULL;
}
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) {
tscDestroyDataBlock(*p);
p = taosHashIterate(pBlockHashTable, p);
}
taosHashCleanup(pBlockHashTable);
return NULL;
}
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pTableMeta != NULL); assert(pDataBlock->pTableMeta != NULL);
...@@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
STableDataBlocks** dataBlocks) {
*dataBlocks = NULL; *dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
...@@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t ...@@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
} }
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
taosArrayPush(pDataBlockList, dataBlocks); if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result; return result;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { static void extractTableMeta(SSqlCmd* pCmd) {
pCmd->numOfTables = taosHashGetSize(pCmd->pTableList);
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
p1 = taosHashIterate(pCmd->pTableList, p1);
}
pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList);
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList); STableDataBlocks** p = taosHashIterate(pCmd->pTableList, NULL);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = *p;
while(pOneTableBlock) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
...@@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
p = taosHashIterate(pCmd->pTableList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
} }
tscDestroyBlockArrayList(pTableDataBlockList); extractTableMeta(pCmd);
// free the table data blocks; // free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList; pCmd->pDataBlocks = pVnodeDataBlockList;
// tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册