提交 b96f7962 编写于 作者: S slguan

remove duplicate data in import sql

上级 e5b121d6
...@@ -55,7 +55,7 @@ SDataBlockList* tscCreateBlockArrayList(); ...@@ -55,7 +55,7 @@ SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList); void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pDataList); void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId); int32_t startOffset, int32_t rowSize, char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name); STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
...@@ -140,7 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd); ...@@ -140,7 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql); bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscDoQuery(SSqlObj* pSql); void tscDoQuery(SSqlObj* pSql);
int32_t sortRemoveDuplicates(STableDataBlocks* dataBuf, int32_t numOfRows); void sortRemoveDuplicates(STableDataBlocks* dataBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -518,18 +518,20 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) { ...@@ -518,18 +518,20 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
const int factor = 5; const int factor = 5;
// expand the allocated size // expand the allocated size
while (remain < rowSize * factor) { if (remain < rowSize * factor) {
pDataBlock->nAllocSize = (uint32_t) (pDataBlock->nAllocSize * 1.5); while (remain < rowSize * factor) {
remain = pDataBlock->nAllocSize - pDataBlock->size; pDataBlock->nAllocSize = (uint32_t) (pDataBlock->nAllocSize * 1.5);
} remain = pDataBlock->nAllocSize - pDataBlock->size;
}
char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) { if (tmp != NULL) {
pDataBlock->pData = tmp; pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else { } else {
assert(false); assert(false);
// do nothing // do nothing
}
} }
return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize; return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize;
...@@ -542,16 +544,21 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterM ...@@ -542,16 +544,21 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterM
pBlocks->numOfRows += numOfRows; pBlocks->numOfRows += numOfRows;
} }
int32_t sortRemoveDuplicates(STableDataBlocks *dataBuf, int32_t numOfRows) { // data block is disordered, sort it in ascending order
// data block is disordered, sort it in ascending order void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
char *pBlockData = dataBuf->pData + sizeof(SShellSubmitBlock); char *pBlockData = pBlocks->payLoad;
qsort(pBlockData, numOfRows, dataBuf->rowSize, rowDataCompar); qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0; int32_t i = 0;
int32_t j = 1; int32_t j = 1;
while (j < numOfRows) { while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
...@@ -568,11 +575,11 @@ int32_t sortRemoveDuplicates(STableDataBlocks *dataBuf, int32_t numOfRows) { ...@@ -568,11 +575,11 @@ int32_t sortRemoveDuplicates(STableDataBlocks *dataBuf, int32_t numOfRows) {
++j; ++j;
} }
numOfRows = i + 1;
dataBuf->ordered = true; dataBuf->ordered = true;
}
return numOfRows; pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize*pBlocks->numOfRows;
}
} }
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd, static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
...@@ -998,7 +1005,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) { ...@@ -998,7 +1005,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
// submit to more than one vnode // submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) { if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid // merge according to vgid
tscMergeTableDataBlocks(pCmd, pCmd->pDataBlocks); tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks);
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
...@@ -1033,8 +1040,6 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) { ...@@ -1033,8 +1040,6 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
int code = TSDB_CODE_INVALID_SQL; int code = TSDB_CODE_INVALID_SQL;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
tscCleanSqlCmd(pCmd);
sql = tscGetToken(sql, &verb, &verblen); sql = tscGetToken(sql, &verb, &verblen);
if (verblen) { if (verblen) {
...@@ -1055,6 +1060,7 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) { ...@@ -1055,6 +1060,7 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
tscCleanSqlCmd(&pSql->cmd);
if (tscIsInsertOrImportData(pSql->sqlstr)) { if (tscIsInsertOrImportData(pSql->sqlstr)) {
/* /*
...@@ -1074,6 +1080,9 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { ...@@ -1074,6 +1080,9 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
} else { } else {
SSqlInfo SQLInfo = {0}; SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr); tSQLParse(&SQLInfo, pSql->sqlstr);
tscAllocPayloadWithSize(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
} }
...@@ -1098,7 +1107,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, int32_t numOfRows, STableDataBlock ...@@ -1098,7 +1107,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, int32_t numOfRows, STableDataBlock
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData); SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows); tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
tscMergeTableDataBlocks(pCmd, pCmd->pDataBlocks); tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks);
// the pDataBlock is different from the pTableDataBlocks // the pDataBlock is different from the pTableDataBlocks
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
......
...@@ -143,9 +143,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -143,9 +143,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
tscCleanSqlCmd(pCmd);
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
// transfer pInfo into select operation // transfer pInfo into select operation
switch (pInfo->sqlType) { switch (pInfo->sqlType) {
case DROP_TABLE: case DROP_TABLE:
...@@ -785,7 +782,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -785,7 +782,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// set sliding value // set sliding value
SSQLToken* pSliding = &pQuerySql->sliding; SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) { if (pSliding->n != 0) {
if (!tscEmbedded) { // pCmd->count == 1 means sql in stream function
if (!tscEmbedded && pCmd->count == 0) {
const char* msg = "not support sliding in query"; const char* msg = "not support sliding in query";
setErrMsg(pCmd, msg); setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
......
...@@ -287,14 +287,20 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -287,14 +287,20 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
pSql->thandle = NULL; pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (UTIL_METER_IS_METRIC(pCmd) && if (UTIL_METER_IS_METRIC(pCmd) && pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
(pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID || pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION)) {
/* /*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed, * for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app * causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/ */
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]); tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED; code = TSDB_CODE_METRICMETA_EXPIRED;
} else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
/*
* session id is invalid(e.g., less than 0 or larger than maximum session per
* vnode) in submit/query msg, no retry
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) { } else if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL; code = TSDB_CODE_NETWORK_UNAVAIL;
} else if (pCmd->command == TSDB_SQL_HB) { } else if (pCmd->command == TSDB_SQL_HB) {
...@@ -1027,8 +1033,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { ...@@ -1027,8 +1033,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); /* number of meters to be inserted */ pShellMsg->numOfSid = htonl(pSql->cmd.count); /* number of meters to be inserted */
pMsg += sizeof(SShellSubmitMsg);
/* /*
* pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here * pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
*/ */
......
...@@ -72,7 +72,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi ...@@ -72,7 +72,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi
pObj->signature = pObj; pObj->signature = pObj;
strncpy(pObj->user, user, TSDB_USER_LEN); strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass(pass, strlen(pass), pObj->pass); taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mgmtPort = port ? port : tsMgmtShellPort; pObj->mgmtPort = port ? port : tsMgmtShellPort;
if (db) { if (db) {
......
...@@ -145,13 +145,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -145,13 +145,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql, int32_t numOfRows) { static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql, int32_t numOfRows) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int64_t timestamp = *(int64_t *)pRes->data; int64_t timestamp = *(int64_t *)pRes->data;
int64_t actualTimestamp = pStream->stime - pStream->interval;
if (timestamp != pStream->stime) { if (timestamp != actualTimestamp) {
// reset the timestamp of each agg point by using start time of each interval // reset the timestamp of each agg point by using start time of each interval
*((int64_t *)pRes->data) = pStream->stime - pStream->interval; *((int64_t *)pRes->data) = actualTimestamp;
tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, actualTimestamp);
pStream->stime - pStream->interval);
} }
} }
...@@ -397,7 +398,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -397,7 +398,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime, stime); tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime);
} else { } else {
int64_t newStime = (stime / pStream->interval) * pStream->interval; int64_t newStime = (stime / pStream->interval) * pStream->interval;
if (newStime != stime) { if (newStime != stime) {
...@@ -435,13 +436,25 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { ...@@ -435,13 +436,25 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
} }
static void setErrorInfo(STscObj* pObj, int32_t code, char* info) {
if (pObj == NULL) {
return;
}
SSqlCmd* pCmd = &pObj->pSql->cmd;
pObj->pSql->res.code = code;
strncpy(pCmd->payload, info, pCmd->payloadLen);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { // todo set corect error msg if (pSql == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
return NULL; return NULL;
} }
...@@ -451,22 +464,31 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, ...@@ -451,22 +464,31 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); pSql->sqlstr = strdup(sqlstr);
if (pSql->sqlstr == NULL) { // todo set corect error msg if (pSql->sqlstr == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tfree(pSql); tfree(pSql);
return NULL; return NULL;
} }
strcpy(pSql->sqlstr, sqlstr);
sem_init(&pSql->rspSem, 0, 0); sem_init(&pSql->rspSem, 0, 0);
sem_init(&pSql->emptyRspSem, 0, 1); sem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0}; SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr); tSQLParse(&SQLInfo, pSql->sqlstr);
tscCleanSqlCmd(&pSql->cmd);
tscAllocPayloadWithSize(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
//todo refactor later
pSql->cmd.count = 1;
pRes->code = tscToSQLCmd(pSql, &SQLInfo); pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pObj, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
...@@ -474,6 +496,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, ...@@ -474,6 +496,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) { if (pStream == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
......
...@@ -401,7 +401,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { ...@@ -401,7 +401,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
} }
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) {
STableDataBlocks *dataBuf = tscCreateDataBlock(size); STableDataBlocks* dataBuf = tscCreateDataBlock(size);
dataBuf->rowSize = rowSize; dataBuf->rowSize = rowSize;
dataBuf->size = startOffset; dataBuf->size = startOffset;
...@@ -419,7 +419,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData ...@@ -419,7 +419,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
} }
if (dataBuf == NULL) { if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId); dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf);
} }
...@@ -427,7 +427,8 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData ...@@ -427,7 +427,8 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
return dataBuf; return dataBuf;
} }
void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList) { void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosInitIntHash(8, sizeof(void*), taosHashInt); void* pVnodeDataBlockHashList = taosInitIntHash(8, sizeof(void*), taosHashInt);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList(); SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
...@@ -453,9 +454,10 @@ void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList) ...@@ -453,9 +454,10 @@ void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList)
} }
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData; SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
assert(pBlocks->numOfRows * pOneTableBlock->rowSize + sizeof(SShellSubmitBlock) == pOneTableBlock->size); sortRemoveDuplicates(pOneTableBlock);
pBlocks->numOfRows = (int16_t)sortRemoveDuplicates(pOneTableBlock, pBlocks->numOfRows); tscTrace("%p meterId:%s, sid:%d, rows:%d, sversion:%d", pSql, pOneTableBlock->meterId, pBlocks->sid,
pBlocks->numOfRows, pBlocks->sversion);
pBlocks->sid = htonl(pBlocks->sid); pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
...@@ -883,7 +885,7 @@ static int32_t validateQuoteToken(SSQLToken* pToken) { ...@@ -883,7 +885,7 @@ static int32_t validateQuoteToken(SSQLToken* pToken) {
if (pToken->type == TK_STRING) { if (pToken->type == TK_STRING) {
return tscValidateName(pToken); return tscValidateName(pToken);
} }
if (k != pToken->n || pToken->type != TK_ID) { if (k != pToken->n || pToken->type != TK_ID) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -906,16 +908,16 @@ int32_t tscValidateName(SSQLToken* pToken) { ...@@ -906,16 +908,16 @@ int32_t tscValidateName(SSQLToken* pToken) {
int len = tSQLGetToken(pToken->z, &pToken->type); int len = tSQLGetToken(pToken->z, &pToken->type);
// single token, validate it // single token, validate it
if (len == pToken->n){ if (len == pToken->n) {
return validateQuoteToken(pToken); return validateQuoteToken(pToken);
} else { } else {
sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n);
if (sep == NULL) { if (sep == NULL) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
return tscValidateName(pToken); return tscValidateName(pToken);
} }
} else { } else {
if (isNumber(pToken)) { if (isNumber(pToken)) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -927,7 +929,7 @@ int32_t tscValidateName(SSQLToken* pToken) { ...@@ -927,7 +929,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
if (pToken->type == TK_SPACE) { if (pToken->type == TK_SPACE) {
strtrim(pToken->z); strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z); pToken->n = (uint32_t)strlen(pToken->z);
} }
pToken->n = tSQLGetToken(pToken->z, &pToken->type); pToken->n = tSQLGetToken(pToken->z, &pToken->type);
......
...@@ -512,6 +512,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -512,6 +512,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
char * pData; char * pData;
TSKEY tsKey; TSKEY tsKey;
int cfile;
int points = 0; int points = 0;
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
SVnodeObj * pVnode = vnodeList + pObj->vnode; SVnodeObj * pVnode = vnodeList + pObj->vnode;
...@@ -528,6 +529,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -528,6 +529,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// to guarantee time stamp is the same for all vnodes // to guarantee time stamp is the same for all vnodes
pData = pSubmit->payLoad; pData = pSubmit->payLoad;
tsKey = taosGetTimestamp(pVnode->cfg.precision); tsKey = taosGetTimestamp(pVnode->cfg.precision);
cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
if (*((TSKEY *)pData) == 0) { if (*((TSKEY *)pData) == 0) {
for (i = 0; i < numOfPoints; ++i) { for (i = 0; i < numOfPoints; ++i) {
*((TSKEY *)pData) = tsKey++; *((TSKEY *)pData) = tsKey++;
...@@ -570,9 +572,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -570,9 +572,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
code = 0; code = 0;
TSKEY firstKey = *((TSKEY *)pData); TSKEY firstKey = *((TSKEY *)pData);
if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + firstKey) { int firstId = firstKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to insert, key:%lld", pObj->vnode, pObj->sid, int lastId = (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
pObj->meterId, pVnode->lastKeyOnFile, firstKey); if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
dError("vid:%d sid:%d id:%s, invalid timestamp to insert, firstKey: %ld lastKey: %ld ", pObj->vnode, pObj->sid,
pObj->meterId, firstKey, (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1))));
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
} }
...@@ -582,7 +586,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -582,7 +586,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId, dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state); pObj->state);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
break; break;
} }
......
...@@ -248,7 +248,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -248,7 +248,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
} }
if (pQueryMsg->numOfSids <= 0) { if (pQueryMsg->numOfSids <= 0) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over; goto _query_over;
} }
...@@ -263,7 +263,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -263,7 +263,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) { if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode); vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over; goto _query_over;
} }
...@@ -274,13 +274,13 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -274,13 +274,13 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->pSidExtInfo == 0) { if (pQueryMsg->pSidExtInfo == 0) {
dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg); dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over; goto _query_over;
} }
if (pVnode->meterList == NULL) { if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over; goto _query_over;
} }
...@@ -448,7 +448,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -448,7 +448,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->numOfSid <= 0) { if (pSubmit->numOfSid <= 0) {
dError("invalid num of meters:%d", pSubmit->numOfSid); dError("invalid num of meters:%d", pSubmit->numOfSid);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _submit_over; goto _submit_over;
} }
...@@ -462,7 +462,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -462,7 +462,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) { if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode); dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode); vnodeSendVpeerCfgMsg(pSubmit->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _submit_over; goto _submit_over;
} }
......
...@@ -13,8 +13,9 @@ ...@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "taosmsg.h"
#include "vnode.h" #include "vnode.h"
#include <taosmsg.h> #include "vnodeUtil.h"
/* static TAOS *dbConn = NULL; */ /* static TAOS *dbConn = NULL; */
void vnodeCloseStreamCallback(void *param); void vnodeCloseStreamCallback(void *param);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册