提交 0d94b0b9 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/wal

......@@ -1413,13 +1413,25 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error
assert(taos_errno(pSql) == code);
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
do {
if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
assert(pSql->res.numOfRows == 0);
int32_t errc = fseek(fp, 0, SEEK_SET);
if (errc < 0) {
tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno));
} else {
break;
}
}
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
return;
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
return;
} while (0);
}
// accumulate the total submit records
......
......@@ -890,13 +890,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
*((int16_t *)pMsg) = pCol->colId;
*((int16_t *)pMsg) = htons(pCol->colId);
pMsg += sizeof(pCol->colId);
*((int16_t *)pMsg) += pCol->colIndex;
*((int16_t *)pMsg) += htons(pCol->colIndex);
pMsg += sizeof(pCol->colIndex);
*((int16_t *)pMsg) += pCol->flag;
*((int16_t *)pMsg) += htons(pCol->flag);
pMsg += sizeof(pCol->flag);
memcpy(pMsg, pCol->name, tListLen(pCol->name));
......
......@@ -69,14 +69,17 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) {
}
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
if( sub == NULL)
if( sub == NULL) {
return;
}
SSub* pSub = (SSub*)sub;
SSubscriptionProgress target = {.uid = uid, .key = ts};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
if (p != NULL) {
p->key = ts;
tscDebug("subscribe:%s, uid:%"PRIu64" update sub start ts:%"PRId64, pSub->topic, p->uid, p->key);
}
}
......@@ -502,6 +505,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
}
if (pSub->pTimer == NULL) {
......
......@@ -164,7 +164,7 @@ extern float tsMinimalLogDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion;
extern uint32_t tsVersion;
// build info
extern char version[];
......
......@@ -204,7 +204,7 @@ float tsAvailDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 1.0f;
int32_t tsTotalMemoryMB = 0;
int32_t tsVersion = 0;
uint32_t tsVersion = 0;
// log
int32_t tsNumOfLogLines = 10000000;
......@@ -1478,15 +1478,20 @@ int32_t taosCheckGlobalCfg() {
// todo refactor
tsVersion = 0;
for (int i = 0; i < 10; i++) {
for (int ver = 0, i = 0; i < TSDB_VERSION_LEN; ++i) {
if (version[i] >= '0' && version[i] <= '9') {
tsVersion = tsVersion * 10 + (version[i] - '0');
ver = ver * 10 + (version[i] - '0');
} else if (version[i] == '.') {
tsVersion |= ver & 0xFF;
tsVersion <<= 8;
ver = 0;
} else if (version[i] == 0) {
tsVersion |= ver & 0xFF;
break;
}
}
tsVersion = 10 * tsVersion;
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
......
......@@ -67,6 +67,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid re
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Client and server's time is not synchronized")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, 0, 0x0015, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, 0, 0x0016, "Invalid app version")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported")
......
......@@ -1722,6 +1722,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
// update the lastkey of current table for projection/aggregation query
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
......@@ -4299,7 +4303,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
int32_t total = 0;
STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
while(item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
......@@ -4307,9 +4313,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
total++;
qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key);
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
}
qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
// Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery) {
......@@ -6214,13 +6225,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t *)pMsg;
(*groupbyCols)[i].colId = htons(*(int16_t *)pMsg);
pMsg += sizeof((*groupbyCols)[i].colId);
(*groupbyCols)[i].colIndex = *(int16_t *)pMsg;
(*groupbyCols)[i].colIndex = htons(*(int16_t *)pMsg);
pMsg += sizeof((*groupbyCols)[i].colIndex);
(*groupbyCols)[i].flag = *(int16_t *)pMsg;
(*groupbyCols)[i].flag = htons(*(int16_t *)pMsg);
pMsg += sizeof((*groupbyCols)[i].flag);
memcpy((*groupbyCols)[i].name, pMsg, tListLen(groupbyCols[i]->name));
......
......@@ -20,10 +20,6 @@
extern "C" {
#endif
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
extern int tsRpcOverhead;
......@@ -58,6 +54,7 @@ typedef struct {
char empty[1]; // reserved
uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint32_t msgVer;
int32_t code; // code in response message
uint8_t content[0]; // message body starts from here
} SRpcHead;
......
......@@ -142,7 +142,6 @@ static int32_t tsRpcNum = 0;
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection,
......@@ -959,6 +958,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL;
}
if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, taosMsg[pHead->msgType]);
terrno = TSDB_CODE_RPC_INVALID_VERSION; return NULL;
}
pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) {
tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
......@@ -1212,6 +1216,7 @@ static void rpcSendReqHead(SRpcConn *pConn) {
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgType = pConn->outType;
pHead->msgVer = htonl(tsVersion >> 8);
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = pConn->outTranId;
......@@ -1282,6 +1287,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
// set the message header
pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = msgType;
pHead->encrypt = 0;
pConn->tranId++;
......
#ifndef _TS_BUILD_H_
#define _TS_BUILD_H_
extern const char tsVersion[];
extern const char tsBuildInfo[];
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册