提交 3cf86c92 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/table

...@@ -90,7 +90,6 @@ pipeline { ...@@ -90,7 +90,6 @@ pipeline {
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WKC}/tests/pytest cd ${WKC}/tests/pytest
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
./handle_crash_gen_val_log.sh ./handle_crash_gen_val_log.sh
''' '''
} }
......
...@@ -330,6 +330,7 @@ typedef struct STscObj { ...@@ -330,6 +330,7 @@ typedef struct STscObj {
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
...@@ -348,7 +349,7 @@ typedef struct SSqlObj { ...@@ -348,7 +349,7 @@ typedef struct SSqlObj {
void *signature; void *signature;
pthread_t owner; // owner of sql object, by which it is executed pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj; STscObj *pTscObj;
void *pRpcCtx; int64_t rpcRid;
void (*fp)(); void (*fp)();
void (*fetchFp)(); void (*fetchFp)();
void *param; void *param;
......
...@@ -428,6 +428,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -428,6 +428,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else { } else {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
// param already freed by other routine and pSql in tscCache when ctrl + c // param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) { if (atomic_load_ptr(&pSql->param) == NULL) {
return; return;
...@@ -441,6 +442,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -441,6 +442,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL); pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// tscProcessSql can add error into async res
tscProcessSql(pSql);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
// tscProcessSql can add error into async res // tscProcessSql can add error into async res
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
...@@ -465,7 +480,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -465,7 +480,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscResetSqlCmdObj(pCmd, false); tscResetSqlCmdObj(pCmd, false);
code = tsParseSql(pSql, true); code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} else if (code != TSDB_CODE_SUCCESS) { } else if (code != TSDB_CODE_SUCCESS) {
......
...@@ -97,6 +97,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc ...@@ -97,6 +97,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[2].i64Key = pQueryInfo->order.order; pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64Key = pQueryInfo->order.orderColId; pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
pCtx->param[0].nType = pExpr->param[0].nType;
} }
pCtx->interBufBytes = pExpr->interBytes; pCtx->interBufBytes = pExpr->interBytes;
......
...@@ -1309,7 +1309,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1309,7 +1309,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if ((!pCmd->parseFinished) && (!initial)) { if ((!pCmd->parseFinished) && (!initial)) {
tscDebug("%p resume to parse sql: %s", pSql, pCmd->curSql); tscDebug("%p resume to parse sql: %s", pSql, pCmd->curSql);
} }
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
......
...@@ -5311,6 +5311,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { ...@@ -5311,6 +5311,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
pMsg->quorum = pCreateDb->quorum; pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update;
} }
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
......
...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
int32_t waitingDuring = tsShellActivityTimer * 500; int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
} else { } else {
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
} }
} }
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle; int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid);
int ret = taosAcquireRef(tscRefId, pObj); if (pObj == NULL) return;
if (ret < 0) {
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
return;
}
SSqlObj* pHB = pObj->pHb; SSqlObj* pHB = pObj->pHb;
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) { if (p == NULL) {
tscWarn("%p HB object has been released already", pHB); tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, pObj->rid);
return; return;
} }
...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, rid);
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.ahandle = pSql, .ahandle = pSql,
.handle = &pSql->pRpcCtx, .handle = NULL,
.code = 0 .code = 0
}; };
...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql); assert(*pSql->self == pSql);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
createHBObj(pObj); createHBObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode //launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
return 0; return 0;
} }
......
...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
registerSqlObj(pSql); registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
taosAddRef(tscRefId, pObj); pObj->rid = taosAddRef(tscRefId, pObj);
return pSql; return pSql;
} }
...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { ...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) {
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx); rpcCancelRequest(pHb->rpcRid);
pHb->pRpcCtx = NULL; pHb->rpcRid = -1;
} }
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { ...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj); taosRemoveRef(tscRefId, pObj->rid);
} }
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
assert(pSubObj->self == (SSqlObj**) p); assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) { if (pSubObj->rpcRid > 0) {
rpcCancelRequest(pSubObj->pRpcCtx); rpcCancelRequest(pSubObj->rpcRid);
pSubObj->pRpcCtx = NULL; pSubObj->rpcRid = -1;
} }
tscQueueAsyncRes(pSubObj); tscQueueAsyncRes(pSubObj);
...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL); assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
} else { } else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) { if (pSql->cmd.command < TSDB_SQL_LOCAL) {
...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) {
* reset and freed in the processMsgFromServer function, and causes the invalid * reset and freed in the processMsgFromServer function, and causes the invalid
* write problem for rpcCancelRequest. * write problem for rpcCancelRequest.
*/ */
if (pSql->pRpcCtx != NULL) { if (pSql->rpcRid > 0) {
rpcCancelRequest(pSql->pRpcCtx); rpcCancelRequest(pSql->rpcRid);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
} }
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
......
...@@ -523,7 +523,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { ...@@ -523,7 +523,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
assert(pSqlObj->subState.numOfRemain > 0); assert(pSqlObj->subState.numOfRemain > 0);
if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) {
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj); freeJoinSubqueryObj(pSqlObj);
} }
} }
...@@ -565,7 +565,7 @@ int32_t tagValCompar(const void* p1, const void* p2) { ...@@ -565,7 +565,7 @@ int32_t tagValCompar(const void* p1, const void* p2) {
return (tag1->len > tag2->len)? 1: -1; return (tag1->len > tag2->len)? 1: -1;
} }
return strncmp(tag1->data, tag2->data, tag1->len); return memcmp(tag1->data, tag2->data, tag1->len);
} }
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) { void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
......
...@@ -36,7 +36,7 @@ void * tscTmr; ...@@ -36,7 +36,7 @@ void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
int tsInsertHeadSize; int tsInsertHeadSize;
int tscRefId; int tscRefId = -1;
int tscNumOfThreads; int tscNumOfThreads;
......
...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) { if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
taosRemoveRef(tscRefId, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid);
} }
} }
......
...@@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); ...@@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes); int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Semantic timestamp key definition
typedef uint64_t TKEY;
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
#define TKEY_DELETE_FLAG (((TKEY)1) << 62)
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0)
#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG)
#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key)))
#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1))
static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
TSKEY key1 = tdGetKey(*(TKEY *)tkey1);
TSKEY key2 = tdGetKey(*(TKEY *)tkey2);
if (key1 < key2) {
return -1;
} else if (key1 > key2) {
return 1;
} else {
return 0;
}
}
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format is like below: /* A data row, the format is like below:
...@@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); ...@@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part | * | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/ */
typedef void *SDataRow; typedef void *SDataRow;
...@@ -137,11 +166,13 @@ typedef void *SDataRow; ...@@ -137,11 +166,13 @@ typedef void *SDataRow;
#define dataRowLen(r) (*(uint16_t *)(r)) #define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) #define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v)) #define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r))
SDataRow tdNewDataRowFromSchema(STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row); void tdFreeDataRow(SDataRow row);
...@@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i ...@@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
switch (type) { if (IS_VAR_DATA_TYPE(type)) {
case TSDB_DATA_TYPE_BINARY: *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
case TSDB_DATA_TYPE_NCHAR: memcpy(ptr, value, varDataTLen(value));
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); dataRowLen(row) += varDataTLen(value);
memcpy(ptr, value, varDataTLen(value)); } else {
dataRowLen(row) += varDataTLen(value); if (offset == 0) {
break; ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
default: TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
break; }
} }
return 0; return 0;
...@@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i ...@@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
// NOTE: offset here including the header size // NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
switch (type) { if (IS_VAR_DATA_TYPE(type)) {
case TSDB_DATA_TYPE_BINARY: return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset));
case TSDB_DATA_TYPE_NCHAR: } else {
return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); return POINTER_SHIFT(row, offset);
default:
return POINTER_SHIFT(row, offset);
} }
} }
...@@ -196,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } ...@@ -196,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -204,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); ...@@ -204,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
switch (pCol->type) { if (IS_VAR_DATA_TYPE(pCol->type)) {
case TSDB_DATA_TYPE_BINARY: return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
case TSDB_DATA_TYPE_NCHAR: } else {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
break;
default:
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
break;
} }
} }
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
ASSERT(rows > 0); ASSERT(rows > 0);
switch (pDataCol->type) { if (IS_VAR_DATA_TYPE(pDataCol->type)) {
case TSDB_DATA_TYPE_BINARY: return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1));
case TSDB_DATA_TYPE_NCHAR: } else {
return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1)); return TYPE_BYTES[pDataCol->type] * rows;
break;
default:
return TYPE_BYTES[pDataCol->type] * rows;
} }
} }
...@@ -243,9 +265,14 @@ typedef struct { ...@@ -243,9 +265,14 @@ typedef struct {
} SDataCols; } SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) #define dataColsTKeyFirst(pCols) (((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0))
#define dataColsKeyFirst(pCols) (((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, 0))
#define dataColsTKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1))
#define dataColsKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols); void tdResetDataCols(SDataCols *pCols);
...@@ -253,10 +280,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); ...@@ -253,10 +280,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols); void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
......
...@@ -88,6 +88,7 @@ extern int16_t tsWAL; ...@@ -88,6 +88,7 @@ extern int16_t tsWAL;
extern int32_t tsFsyncPeriod; extern int32_t tsFsyncPeriod;
extern int32_t tsReplications; extern int32_t tsReplications;
extern int32_t tsQuorum; extern int32_t tsQuorum;
extern int32_t tsUpdate;
// balance // balance
extern int32_t tsEnableBalance; extern int32_t tsEnableBalance;
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
/** /**
* Duplicate the schema and return a new object * Duplicate the schema and return a new object
*/ */
...@@ -202,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -202,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
pDataCol->len = 0; pDataCol->len = 0;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pDataCol->type)) {
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
pDataCol->spaceSize = pDataCol->bytes * maxPoints; pDataCol->spaceSize = pDataCol->bytes * maxPoints;
...@@ -215,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -215,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
} }
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
switch (pCol->type) { if (IS_VAR_DATA_TYPE(pCol->type)) {
case TSDB_DATA_TYPE_BINARY: // set offset
case TSDB_DATA_TYPE_NCHAR: pCol->dataOff[numOfRows] = pCol->len;
// set offset // Copy data
pCol->dataOff[numOfRows] = pCol->len; memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Copy data // Update the length
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); pCol->len += varDataTLen(value);
// Update the length
pCol->len += varDataTLen(value);
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
break;
}
}
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
int pointsLeft = numOfRows - pointsToPop;
ASSERT(pointsLeft > 0);
if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
ASSERT(pCol->len > 0);
VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
pCol->len = pCol->len - toffset;
ASSERT(pCol->len > 0);
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
dataColSetOffset(pCol, pointsLeft);
} else { } else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); pCol->len += pCol->bytes;
} }
} }
bool isNEleNull(SDataCol *pCol, int nEle) { bool isNEleNull(SDataCol *pCol, int nEle) {
switch (pCol->type) { for (int i = 0; i < nEle; i++) {
case TSDB_DATA_TYPE_BINARY: if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
case TSDB_DATA_TYPE_NCHAR:
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
}
return true;
default:
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
}
return true;
} }
return true;
} }
void dataColSetNullAt(SDataCol *pCol, int index) { void dataColSetNullAt(SDataCol *pCol, int index) {
...@@ -390,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -390,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
ASSERT(pDataCols->cols[i].dataOff != NULL); ASSERT(pDataCols->cols[i].dataOff != NULL);
pRet->cols[i].dataOff = pRet->cols[i].dataOff =
(int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
...@@ -400,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -400,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].len = pDataCols->cols[i].len;
if (pDataCols->cols[i].len > 0) { if (pDataCols->cols[i].len > 0) {
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
} }
} }
...@@ -420,58 +392,54 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -420,58 +392,54 @@ void tdResetDataCols(SDataCols *pCols) {
} }
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0; int rcol = 0;
int dcol = 0; int dcol = 0;
while (dcol < pCols->numOfCols) { if (dataRowDeleted(row)) {
SDataCol *pDataCol = &(pCols->cols[dcol]); for (; dcol < pCols->numOfCols; dcol++) {
if (rcol >= schemaNCols(pSchema)) { SDataCol *pDataCol = &(pCols->cols[dcol]);
dataColSetNullAt(pDataCol, pCols->numOfRows); if (dcol == 0) {
dcol++; dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints);
continue; } else {
dataColSetNullAt(pDataCol, pCols->numOfRows);
}
} }
} else {
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++;
continue;
}
STColumn *pRowCol = schemaColAt(pSchema, rcol); STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
dataColSetNullAt(pDataCol, pCols->numOfRows); dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++; dcol++;
}
} }
} }
pCols->numOfRows++; pCols->numOfRows++;
} }
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfRows - pointsToPop;
if (pointsLeft <= 0) {
tdResetDataCols(pCols);
return;
}
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pCol = pCols->cols + iCol;
dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
}
pCols->numOfRows = pointsLeft;
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
SDataCols *pTarget = NULL; SDataCols *pTarget = NULL;
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) { if (source->cols[j].len > 0) {
...@@ -499,17 +467,23 @@ _err: ...@@ -499,17 +467,23 @@ _err:
return -1; return -1;
} }
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { // src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows) {
tdResetDataCols(target); tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
while (target->numOfRows < tRows) { while (target->numOfRows < tRows) {
if (*iter1 >= limit1 && *iter2 >= limit2) break; if (*iter1 >= limit1 && *iter2 >= limit2) break;
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
if (key1 <= key2) { ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) { if (src1->cols[i].len > 0) {
...@@ -520,19 +494,23 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -520,19 +494,23 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++; target->numOfRows++;
(*iter1)++; (*iter1)++;
if (key1 == key2) (*iter2)++; } else if (key1 >= key2) {
} else { if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0) { if (src2->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints); target->maxPoints);
}
} }
target->numOfRows++;
} }
target->numOfRows++;
(*iter2)++; (*iter2)++;
if (key1 == key2) (*iter1)++;
} }
ASSERT(target->numOfRows <= target->maxPoints);
} }
} }
......
...@@ -120,6 +120,7 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; ...@@ -120,6 +120,7 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP; int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
...@@ -786,6 +787,16 @@ static void doInitGlobalConfig(void) { ...@@ -786,6 +787,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "update";
cfg.ptr = &tsUpdate;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_DB_UPDATE;
cfg.maxValue = TSDB_MAX_DB_UPDATE;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttHostName"; cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName; cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.valType = TAOS_CFG_VTYPE_STRING;
......
...@@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11]; ...@@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_SMALLINT_NULL 0x8000 #define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000L #define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN #define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
...@@ -363,6 +364,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -363,6 +364,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 1
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
......
...@@ -544,6 +544,8 @@ typedef struct { ...@@ -544,6 +544,8 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t quorum; int8_t quorum;
int8_t ignoreExist; int8_t ignoreExist;
int8_t update;
int8_t reserve[9];
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
...@@ -654,7 +656,8 @@ typedef struct { ...@@ -654,7 +656,8 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t reserved[16]; int8_t update;
int8_t reserved[15];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
......
...@@ -83,13 +83,13 @@ void rpcClose(void *); ...@@ -83,13 +83,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(void *pContext); void rpcCancelRequest(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -65,6 +65,7 @@ typedef struct { ...@@ -65,6 +65,7 @@ typedef struct {
int32_t maxRowsPerFileBlock; // maximum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t update;
} STsdbCfg; } STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS // --------- TSDB REPOSITORY USAGE STATISTICS
......
...@@ -106,13 +106,13 @@ typedef void* tsync_h; ...@@ -106,13 +106,13 @@ typedef void* tsync_h;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
tsync_h syncStart(const SSyncInfo *); int64_t syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(int64_t rid);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes: void syncRecover(int64_t rid); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *); int syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
......
...@@ -114,114 +114,115 @@ ...@@ -114,114 +114,115 @@
#define TK_FSYNC 95 #define TK_FSYNC 95
#define TK_COMP 96 #define TK_COMP 96
#define TK_PRECISION 97 #define TK_PRECISION 97
#define TK_LP 98 #define TK_UPDATE 98
#define TK_RP 99 #define TK_LP 99
#define TK_TAGS 100 #define TK_RP 100
#define TK_USING 101 #define TK_TAGS 101
#define TK_AS 102 #define TK_USING 102
#define TK_COMMA 103 #define TK_AS 103
#define TK_NULL 104 #define TK_COMMA 104
#define TK_SELECT 105 #define TK_NULL 105
#define TK_UNION 106 #define TK_SELECT 106
#define TK_ALL 107 #define TK_UNION 107
#define TK_FROM 108 #define TK_ALL 108
#define TK_VARIABLE 109 #define TK_FROM 109
#define TK_INTERVAL 110 #define TK_VARIABLE 110
#define TK_FILL 111 #define TK_INTERVAL 111
#define TK_SLIDING 112 #define TK_FILL 112
#define TK_ORDER 113 #define TK_SLIDING 113
#define TK_BY 114 #define TK_ORDER 114
#define TK_ASC 115 #define TK_BY 115
#define TK_DESC 116 #define TK_ASC 116
#define TK_GROUP 117 #define TK_DESC 117
#define TK_HAVING 118 #define TK_GROUP 118
#define TK_LIMIT 119 #define TK_HAVING 119
#define TK_OFFSET 120 #define TK_LIMIT 120
#define TK_SLIMIT 121 #define TK_OFFSET 121
#define TK_SOFFSET 122 #define TK_SLIMIT 122
#define TK_WHERE 123 #define TK_SOFFSET 123
#define TK_NOW 124 #define TK_WHERE 124
#define TK_RESET 125 #define TK_NOW 125
#define TK_QUERY 126 #define TK_RESET 126
#define TK_ADD 127 #define TK_QUERY 127
#define TK_COLUMN 128 #define TK_ADD 128
#define TK_TAG 129 #define TK_COLUMN 129
#define TK_CHANGE 130 #define TK_TAG 130
#define TK_SET 131 #define TK_CHANGE 131
#define TK_KILL 132 #define TK_SET 132
#define TK_CONNECTION 133 #define TK_KILL 133
#define TK_STREAM 134 #define TK_CONNECTION 134
#define TK_COLON 135 #define TK_STREAM 135
#define TK_ABORT 136 #define TK_COLON 136
#define TK_AFTER 137 #define TK_ABORT 137
#define TK_ATTACH 138 #define TK_AFTER 138
#define TK_BEFORE 139 #define TK_ATTACH 139
#define TK_BEGIN 140 #define TK_BEFORE 140
#define TK_CASCADE 141 #define TK_BEGIN 141
#define TK_CLUSTER 142 #define TK_CASCADE 142
#define TK_CONFLICT 143 #define TK_CLUSTER 143
#define TK_COPY 144 #define TK_CONFLICT 144
#define TK_DEFERRED 145 #define TK_COPY 145
#define TK_DELIMITERS 146 #define TK_DEFERRED 146
#define TK_DETACH 147 #define TK_DELIMITERS 147
#define TK_EACH 148 #define TK_DETACH 148
#define TK_END 149 #define TK_EACH 149
#define TK_EXPLAIN 150 #define TK_END 150
#define TK_FAIL 151 #define TK_EXPLAIN 151
#define TK_FOR 152 #define TK_FAIL 152
#define TK_IGNORE 153 #define TK_FOR 153
#define TK_IMMEDIATE 154 #define TK_IGNORE 154
#define TK_INITIALLY 155 #define TK_IMMEDIATE 155
#define TK_INSTEAD 156 #define TK_INITIALLY 156
#define TK_MATCH 157 #define TK_INSTEAD 157
#define TK_KEY 158 #define TK_MATCH 158
#define TK_OF 159 #define TK_KEY 159
#define TK_RAISE 160 #define TK_OF 160
#define TK_REPLACE 161 #define TK_RAISE 161
#define TK_RESTRICT 162 #define TK_REPLACE 162
#define TK_ROW 163 #define TK_RESTRICT 163
#define TK_STATEMENT 164 #define TK_ROW 164
#define TK_TRIGGER 165 #define TK_STATEMENT 165
#define TK_VIEW 166 #define TK_TRIGGER 166
#define TK_COUNT 167 #define TK_VIEW 167
#define TK_SUM 168 #define TK_COUNT 168
#define TK_AVG 169 #define TK_SUM 169
#define TK_MIN 170 #define TK_AVG 170
#define TK_MAX 171 #define TK_MIN 171
#define TK_FIRST 172 #define TK_MAX 172
#define TK_LAST 173 #define TK_FIRST 173
#define TK_TOP 174 #define TK_LAST 174
#define TK_BOTTOM 175 #define TK_TOP 175
#define TK_STDDEV 176 #define TK_BOTTOM 176
#define TK_PERCENTILE 177 #define TK_STDDEV 177
#define TK_APERCENTILE 178 #define TK_PERCENTILE 178
#define TK_LEASTSQUARES 179 #define TK_APERCENTILE 179
#define TK_HISTOGRAM 180 #define TK_LEASTSQUARES 180
#define TK_DIFF 181 #define TK_HISTOGRAM 181
#define TK_SPREAD 182 #define TK_DIFF 182
#define TK_TWA 183 #define TK_SPREAD 183
#define TK_INTERP 184 #define TK_TWA 184
#define TK_LAST_ROW 185 #define TK_INTERP 185
#define TK_RATE 186 #define TK_LAST_ROW 186
#define TK_IRATE 187 #define TK_RATE 187
#define TK_SUM_RATE 188 #define TK_IRATE 188
#define TK_SUM_IRATE 189 #define TK_SUM_RATE 189
#define TK_AVG_RATE 190 #define TK_SUM_IRATE 190
#define TK_AVG_IRATE 191 #define TK_AVG_RATE 191
#define TK_TBID 192 #define TK_AVG_IRATE 192
#define TK_SEMI 193 #define TK_TBID 193
#define TK_NONE 194 #define TK_SEMI 194
#define TK_PREV 195 #define TK_NONE 195
#define TK_LINEAR 196 #define TK_PREV 196
#define TK_IMPORT 197 #define TK_LINEAR 197
#define TK_METRIC 198 #define TK_IMPORT 198
#define TK_TBNAME 199 #define TK_METRIC 199
#define TK_JOIN 200 #define TK_TBNAME 200
#define TK_METRICS 201 #define TK_JOIN 201
#define TK_STABLE 202 #define TK_METRICS 202
#define TK_INSERT 203 #define TK_STABLE 203
#define TK_INTO 204 #define TK_INSERT 204
#define TK_VALUES 205 #define TK_INTO 205
#define TK_VALUES 206
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -172,7 +172,8 @@ typedef struct { ...@@ -172,7 +172,8 @@ typedef struct {
int8_t walLevel; int8_t walLevel;
int8_t replications; int8_t replications;
int8_t quorum; int8_t quorum;
int8_t reserved[12]; int8_t update;
int8_t reserved[11];
} SDbCfg; } SDbCfg;
typedef struct SDbObj { typedef struct SDbObj {
......
...@@ -319,6 +319,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { ...@@ -319,6 +319,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
} }
#endif #endif
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) {
mError("invalid db option update:%d valid range: [%d, %d]", pCfg->update, TSDB_MIN_DB_UPDATE, TSDB_MAX_DB_UPDATE);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -339,6 +344,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -339,6 +344,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL; if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
if (pCfg->replications < 0) pCfg->replications = tsReplications; if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum; if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
if (pCfg->update < 0) pCfg->update = tsUpdate;
} }
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -391,7 +397,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -391,7 +397,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.compression = pCreate->compression, .compression = pCreate->compression,
.walLevel = pCreate->walLevel, .walLevel = pCreate->walLevel,
.replications = pCreate->replications, .replications = pCreate->replications,
.quorum = pCreate->quorum .quorum = pCreate->quorum,
.update = pCreate->update
}; };
mnodeSetDefaultDbCfg(&pDb->cfg); mnodeSetDefaultDbCfg(&pDb->cfg);
...@@ -610,6 +617,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -610,6 +617,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "update");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
...@@ -749,6 +762,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -749,6 +762,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.update;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pDb->status == TSDB_DB_STATUS_READY) { if (pDb->status == TSDB_DB_STATUS_READY) {
const char *src = "ready"; const char *src = "ready";
...@@ -848,6 +865,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -848,6 +865,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t replications = pAlter->replications; int8_t replications = pAlter->replications;
int8_t quorum = pAlter->quorum; int8_t quorum = pAlter->quorum;
int8_t precision = pAlter->precision; int8_t precision = pAlter->precision;
int8_t update = pAlter->update;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -950,6 +968,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -950,6 +968,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
newCfg.quorum = quorum; newCfg.quorum = quorum;
} }
if (update >= 0 && update != pDb->cfg.update) {
#if 0
mDebug("db:%s, update:%d change to %d", pDb->name, pDb->cfg.update, update);
newCfg.update = update;
#else
mError("db:%s, can't alter update option", pDb->name);
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
#endif
}
return newCfg; return newCfg;
} }
......
...@@ -72,7 +72,7 @@ typedef struct { ...@@ -72,7 +72,7 @@ typedef struct {
ESyncRole role; ESyncRole role;
ESdbStatus status; ESdbStatus status;
int64_t version; int64_t version;
void * sync; int64_t sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
int32_t numOfTables; int32_t numOfTables;
...@@ -212,7 +212,7 @@ static void sdbRestoreTables() { ...@@ -212,7 +212,7 @@ static void sdbRestoreTables() {
} }
void sdbUpdateMnodeRoles() { void sdbUpdateMnodeRoles() {
if (tsSdbObj.sync == NULL) return; if (tsSdbObj.sync <= 0) return;
SNodesRole roles = {0}; SNodesRole roles = {0};
syncGetNodesRole(tsSdbObj.sync, &roles); syncGetNodesRole(tsSdbObj.sync, &roles);
...@@ -433,7 +433,7 @@ void sdbCleanUp() { ...@@ -433,7 +433,7 @@ void sdbCleanUp() {
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync); syncStop(tsSdbObj.sync);
tsSdbObj.sync = NULL; tsSdbObj.sync = -1;
} }
if (tsSdbObj.wal) { if (tsSdbObj.wal) {
......
...@@ -850,6 +850,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -850,6 +850,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->replications = (int8_t) pVgroup->numOfVnodes; pCfg->replications = (int8_t) pVgroup->numOfVnodes;
pCfg->wals = 3; pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum; pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update;
SVnodeDesc *pNodes = pVnode->nodes; SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
...@@ -129,6 +129,7 @@ typedef struct SCreateDBInfo { ...@@ -129,6 +129,7 @@ typedef struct SCreateDBInfo {
int32_t compressionLevel; int32_t compressionLevel;
SStrToken precision; SStrToken precision;
bool ignoreExists; bool ignoreExists;
int8_t update;
tVariantList *keep; tVariantList *keep;
} SCreateDBInfo; } SCreateDBInfo;
......
...@@ -239,6 +239,7 @@ wal(Y) ::= WAL INTEGER(X). { Y = X; } ...@@ -239,6 +239,7 @@ wal(Y) ::= WAL INTEGER(X). { Y = X; }
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; } fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
comp(Y) ::= COMP INTEGER(X). { Y = X; } comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; } prec(Y) ::= PRECISION STRING(X). { Y = X; }
update(Y) ::= UPDATE INTEGER(X). { Y = X; }
%type db_optr {SCreateDBInfo} %type db_optr {SCreateDBInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);} db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
...@@ -256,6 +257,7 @@ db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z ...@@ -256,6 +257,7 @@ db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; } db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; } db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
%type alter_db_optr {SCreateDBInfo} %type alter_db_optr {SCreateDBInfo}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);} alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);}
...@@ -267,6 +269,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = s ...@@ -267,6 +269,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = s
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
%type typename {TAOS_FIELD} %type typename {TAOS_FIELD}
typename(A) ::= ids(X). { typename(A) ::= ids(X). {
......
...@@ -149,7 +149,7 @@ typedef struct SResultRowCellInfo { ...@@ -149,7 +149,7 @@ typedef struct SResultRowCellInfo {
int8_t hasResult; // result generated, not NULL value int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized bool initialized; // output buffer has been initialized
bool complete; // query has completed bool complete; // query has completed
uint16_t numOfRes; // num of output result in current buffer uint32_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo; } SResultRowCellInfo;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
......
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
#include "tstoken.h" #include "tstoken.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tulog.h" #include "tulog.h"
#include "tutil.h"
/* /*
* *
...@@ -415,9 +414,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -415,9 +414,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
} }
if (cond.start != NULL) { if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_ASC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else { } else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
} }
if (cond.start != NULL) { if (cond.start != NULL) {
...@@ -432,7 +431,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -432,7 +431,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
break; break;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
...@@ -450,7 +449,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -450,7 +449,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_GREATER) { if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue; continue;
} else { } else {
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
comp = false; comp = false;
} }
...@@ -465,22 +464,22 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -465,22 +464,22 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
continue; continue;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
comp = true; comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) { while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0); comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) { if (comp) {
continue; continue;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
...@@ -504,7 +503,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -504,7 +503,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_LESS) { if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue; continue;
} else { } else {
STableKeyInfo info = {.pTable = *(void **)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
comp = false; // no need to compare anymore comp = false; // no need to compare anymore
} }
...@@ -518,7 +517,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -518,7 +517,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type); bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) || if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) { (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = *(void **)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
} }
...@@ -682,7 +681,7 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki ...@@ -682,7 +681,7 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) { if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, SL_GET_NODE_DATA(pNode)); taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
...@@ -697,7 +696,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -697,7 +696,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
char * pData = SL_GET_NODE_DATA(pNode); char * pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData); tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
...@@ -711,7 +710,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -711,7 +710,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
} }
if (addToResult) { if (addToResult) {
STableKeyInfo info = {.pTable = *(void**)pData, .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info); taosArrayPush(res, &info);
} }
} }
......
...@@ -1609,7 +1609,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1609,7 +1609,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);//calloc(1, sizeof(SResultRow)); pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);
if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
goto _clean; goto _clean;
...@@ -1745,6 +1745,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1745,6 +1745,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
taosTFree(pRuntimeEnv->keyBuf); taosTFree(pRuntimeEnv->keyBuf);
taosTFree(pRuntimeEnv->rowCellInfoOffset);
taosHashCleanup(pRuntimeEnv->pResultRowHashTable); taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL; pRuntimeEnv->pResultRowHashTable = NULL;
......
...@@ -170,7 +170,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows ...@@ -170,7 +170,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
int32_t numOfRows = taosNumOfRemainRows(pFillInfo); int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = ekey; TSKEY ekey1 = ekey;
if (pFillInfo->order != TSDB_ORDER_ASC) { if (!FILL_IS_ASC_FILL(pFillInfo)) {
pFillInfo->endKey = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision); pFillInfo->endKey = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision);
} }
......
...@@ -168,7 +168,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -168,7 +168,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
(*pHisto)->numOfEntries += 1; (*pHisto)->numOfEntries += 1;
} }
} else { /* insert a new slot */ } else { /* insert a new slot */
if ((*pHisto)->numOfElems > 1 && idx < (*pHisto)->numOfEntries) { if ((*pHisto)->numOfElems >= 1 && idx < (*pHisto)->numOfEntries) {
if (idx > 0) { if (idx > 0) {
assert((*pHisto)->elems[idx - 1].val <= val); assert((*pHisto)->elems[idx - 1].val <= val);
} }
...@@ -661,4 +661,4 @@ SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2 ...@@ -661,4 +661,4 @@ SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2
free(pHistoBins); free(pHistoBins);
return pResHistogram; return pResHistogram;
} }
\ No newline at end of file
...@@ -872,5 +872,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { ...@@ -872,5 +872,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->quorum = -1; pDBInfo->quorum = -1;
pDBInfo->keep = NULL; pDBInfo->keep = NULL;
pDBInfo->update = -1;
memset(&pDBInfo->precision, 0, sizeof(SStrToken)); memset(&pDBInfo->precision, 0, sizeof(SStrToken));
} }
...@@ -155,6 +155,7 @@ static SKeyword keywordTable[] = { ...@@ -155,6 +155,7 @@ static SKeyword keywordTable[] = {
{"INSERT", TK_INSERT}, {"INSERT", TK_INSERT},
{"INTO", TK_INTO}, {"INTO", TK_INTO},
{"VALUES", TK_VALUES}, {"VALUES", TK_VALUES},
{"UPDATE", TK_UPDATE},
{"RESET", TK_RESET}, {"RESET", TK_RESET},
{"QUERY", TK_QUERY}, {"QUERY", TK_QUERY},
{"ADD", TK_ADD}, {"ADD", TK_ADD},
...@@ -664,4 +665,4 @@ void taosCleanupKeywordsTable() { ...@@ -664,4 +665,4 @@ void taosCleanupKeywordsTable() {
if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) {
taosHashCleanup(m); taosHashCleanup(m);
} }
} }
\ No newline at end of file
此差异已折叠。
...@@ -82,6 +82,7 @@ typedef struct { ...@@ -82,6 +82,7 @@ typedef struct {
int8_t oldInUse; // server EP inUse passed by app int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
...@@ -220,8 +221,7 @@ static void rpcFree(void *p) { ...@@ -220,8 +221,7 @@ static void rpcFree(void *p) {
free(p); free(p);
} }
static void rpcInit(void) { void rpcInit(void) {
tsProgressTimer = tsRpcTimer/2; tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
...@@ -230,6 +230,11 @@ static void rpcInit(void) { ...@@ -230,6 +230,11 @@ static void rpcInit(void) {
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
} }
void rpcCleanup(void) {
taosCloseRef(tsRpcRefId);
tsRpcRefId = -1;
}
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
// set the handle to pContext, so app can cancel the request // set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext; if (pMsg->handle) *((void **)pMsg->handle) = pContext;
taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return; return pContext->rid;
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { ...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return code; return code;
} }
void rpcCancelRequest(void *handle) { void rpcCancelRequest(int64_t rid) {
SRpcReqContext *pContext = handle;
int code = taosAcquireRef(tsRpcRefId, pContext); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
if (code < 0) return; if (pContext == NULL) return;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
taosReleaseRef(tsRpcRefId, pContext); taosReleaseRef(tsRpcRefId, rid);
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
// if there is an outgoing message, free it // if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) { if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL; if (pConn->pContext) pConn->pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pConn->pContext); taosRemoveRef(tsRpcRefId, pConn->pContext->rid);
} }
} }
...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
// free the request message // free the request message
taosRemoveRef(tsRpcRefId, pContext); taosRemoveRef(tsRpcRefId, pContext->rid);
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc)
tDebug("%s rpc resources are released", pRpc->label); tDebug("%s rpc resources are released", pRpc->label);
taosTFree(pRpc); taosTFree(pRpc);
int count = atomic_sub_fetch_32(&tsRpcNum, 1); atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) {
// taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
}
} }
} }
...@@ -141,6 +141,7 @@ typedef struct SSyncNode { ...@@ -141,6 +141,7 @@ typedef struct SSyncNode {
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
uint32_t vgId; uint32_t vgId;
int64_t rid;
void *ahandle; void *ahandle;
int8_t selfIndex; int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
......
...@@ -142,14 +142,14 @@ void syncCleanUp() { ...@@ -142,14 +142,14 @@ void syncCleanUp() {
sInfo("sync module is cleaned up"); sInfo("sync module is cleaned up");
} }
void *syncStart(const SSyncInfo *pInfo) { int64_t syncStart(const SSyncInfo *pInfo) {
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) { if (pNode == NULL) {
sError("no memory to allocate syncNode"); sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return -1;
} }
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum; pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode); pNode->rid = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) { if (pNode->rid < 0) {
syncFreeNode(pNode); syncFreeNode(pNode);
return NULL; return -1;
} }
for (int i = 0; i < pCfg->replica; ++i) { for (int i = 0; i < pCfg->replica; ++i) {
...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG; terrno = TSDB_CODE_SYN_INVALID_CONFIG;
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
nodeVersion = pInfo->version; // set the initial version nodeVersion = pInfo->version; // set the initial version
...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
if (pNode->pFwdTimer == NULL) { if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId); sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
return pNode; return pNode->rid;
} }
void syncStop(void *param) { void syncStop(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
...@@ -245,16 +244,15 @@ void syncStop(void *param) { ...@@ -245,16 +244,15 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, pNode); taosRemoveRef(tsSyncRefId, rid);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param;
int i, j; int i, j;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica); pNode->replica);
...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]); syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return 0;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) { void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
} }
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
void syncRecover(void *param) { void syncRecover(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK // to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work // if take this node to unsync state, the whole system may not work
...@@ -393,14 +386,12 @@ void syncRecover(void *param) { ...@@ -393,14 +386,12 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return -1;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { ...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); ...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1);
int syncDecPeerRef(SSyncPeer *pPeer) { int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id); sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd); taosTFree(pPeer->watchFd);
...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
} }
taosAcquireRef(tsSyncRefId, pNode); taosAcquireRef(tsSyncRefId, pNode->rid);
return pPeer; return pPeer;
} }
...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, pNode->rid);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param; int64_t rid = (int64_t) param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
int ret = taosAcquireRef(tsSyncRefId, pNode); if (pNode == NULL) return;
if ( ret < 0) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
......
...@@ -30,7 +30,7 @@ int dataFd = -1; ...@@ -30,7 +30,7 @@ int dataFd = -1;
void * qhandle = NULL; void * qhandle = NULL;
int walNum = 0; int walNum = 0;
uint64_t tversion = 0; uint64_t tversion = 0;
void * syncHandle; int64_t syncHandle;
int role; int role;
int nodeId; int nodeId;
char path[256]; char path[256];
......
...@@ -320,6 +320,15 @@ typedef struct { ...@@ -320,6 +320,15 @@ typedef struct {
void* compBuffer; // Buffer for temperary compress/decompress purpose void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
typedef struct {
int rowsInserted;
int rowsUpdated;
int rowsDeleteSucceed;
int rowsDeleteFailed;
int nOperations;
TSKEY keyFirst;
TSKEY keyLast;
} SMergeInfo;
// ------------------ tsdbScan.c // ------------------ tsdbScan.c
typedef struct { typedef struct {
SFileGroup fGroup; SFileGroup fGroup;
...@@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); ...@@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c // ------------------ tsdbMemTable.c
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
...@@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem) ...@@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem)
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TSKEY* filterKeys, int nFilterKeys); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
...@@ -438,16 +447,23 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { ...@@ -438,16 +447,23 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
SSkipListNode* node = tSkipListIterGet(pIter); SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) return NULL; if (node == NULL) return NULL;
return *(SDataRow *)SL_GET_NODE_DATA(node); return (SDataRow)SL_GET_NODE_DATA(node);
} }
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter); SDataRow row = tsdbNextIterRow(pIter);
if (row == NULL) return -1; if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return dataRowKey(row); return dataRowKey(row);
} }
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TKEY_NULL;
return dataRowTKey(row);
}
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL); ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL; if (pRepo->mem == NULL) return NULL;
......
...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
} }
} }
// update check
if (pCfg->update != 0) pCfg->update = 1;
return 0; return 0;
_err: _err:
...@@ -762,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -762,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return -1; return -1;
} }
if (tsdbInsertRowToMem(pRepo, row, pTable) < 0) return -1; if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
(*affectedrows)++; (*affectedrows)++;
points++; points++;
...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { ...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock); tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression); tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
return tlen; return tlen;
} }
...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { ...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock)); buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression)); buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
return buf; return buf;
} }
......
...@@ -32,43 +32,40 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -32,43 +32,40 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
int32_t level = 0; TKEY tkey = dataRowTKey(row);
int32_t headSize = 0;
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem; SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL; STableData *pTableData = NULL;
SSkipList * pSList = NULL; bool isRowDelete = TKEY_IS_DELETED(tkey);
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && if (isRowDelete) {
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { if (!pCfg->update) {
pTableData = pMemTable->tData[TABLE_TID(pTable)]; tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
pSList = pTableData->pData; terrno = TSDB_CODE_TDB_INVALID_ACTION;
} return -1;
}
tSkipListNewNodeInfo(pSList, &level, &headSize);
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *)); if (key > TABLE_LASTKEY(pTable)) {
if (pNode == NULL) { tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
return -1; return 0;
}
} }
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) { if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
free(pNode);
return -1; return -1;
} }
pNode->level = level;
dataRowCpy(pRow, row); dataRowCpy(pRow, row);
*(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
// Operations above may change pRepo->mem, retake those values // Operations above may change pRepo->mem, retake those values
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
...@@ -77,7 +74,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -77,7 +74,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
} }
...@@ -97,7 +93,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -97,7 +93,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
" to table %s while create new table data object since %s", " to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
...@@ -106,24 +101,31 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -106,24 +101,31 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
if (tSkipListPut(pTableData->pData, pNode) == NULL) { int64_t oldSize = SL_SIZE(pTableData->pData);
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
} else { } else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
if (isRowDelete) {
if (TABLE_LASTKEY(pTable) == key) {
// TODO: need to update table last key here (may from file)
}
} else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
}
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows++; pMemTable->numOfRows += deltaSize;
if (pTableData->keyFirst > key) pTableData->keyFirst = key; if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key; if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows++; pTableData->numOfRows += deltaSize;
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
} }
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key); isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
return 0; return 0;
} }
...@@ -295,63 +297,124 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -295,63 +297,124 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return 0; return 0;
} }
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUSH AS POSSIBLE.
*/
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TSKEY *filterKeys, int nFilterKeys) { TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0 && pMergeInfo != NULL);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
STSchema *pSchema = NULL; STSchema *pSchema = NULL;
int numOfRows = 0; TSKEY rowKey = 0;
TSKEY keyNext = 0; TSKEY fKey = 0;
bool isRowDel = false;
int filterIter = 0; int filterIter = 0;
SDataRow row = NULL;
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
pMergeInfo->keyFirst = INT64_MAX;
pMergeInfo->keyLast = INT64_MIN;
if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = dataRowKey(row);
isRowDel = dataRowDeleted(row);
}
if (nFilterKeys != 0) { // for filter purpose if (filterIter >= nFilterKeys) {
ASSERT(filterKeys != NULL); fKey = INT64_MAX;
keyNext = tsdbNextIterKey(pIter); } else {
if (keyNext < 0 || keyNext > maxKey) return numOfRows; fKey = tdGetKey(filterKeys[filterIter]);
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE); }
filterIter = (ptr == NULL) ? nFilterKeys : (int)((POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY)));
} while (true) {
if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
do {
SDataRow row = tsdbNextIterRow(pIter); if (fKey < rowKey) {
if (row == NULL) break; pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
keyNext = dataRowKey(row);
if (keyNext > maxKey) break; filterIter++;
if (filterIter >= nFilterKeys) {
bool keyFiltered = false; fKey = INT64_MAX;
if (nFilterKeys != 0) { } else {
while (true) { fKey = tdGetKey(filterKeys[filterIter]);
if (filterIter >= nFilterKeys) break; }
if (keyNext == filterKeys[filterIter]) { } else if (fKey > rowKey) {
keyFiltered = true; if (isRowDel) {
filterIter++; pMergeInfo->rowsDeleteFailed++;
break; } else {
} else if (keyNext < filterKeys[filterIter]) { if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
break; if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsInserted++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
}
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = dataRowKey(row);
isRowDel = dataRowDeleted(row);
}
} else {
if (isRowDel) {
ASSERT(!keepDup);
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsDeleteSucceed++;
pMergeInfo->nOperations++;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else {
if (keepDup) {
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsUpdated++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else { } else {
filterIter++; pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
} }
} }
}
if (!keyFiltered) { tSkipListIterNext(pIter);
if (numOfRows >= maxRowsToRead) break; row = tsdbNextIterRow(pIter);
if (pCols) { if (row == NULL || dataRowKey(row) > maxKey) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { rowKey = INT64_MAX;
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); isRowDel = false;
if (pSchema == NULL) { } else {
ASSERT(0); rowKey = dataRowKey(row);
} isRowDel = dataRowDeleted(row);
} }
tdAppendDataRowToDataCol(row, pSchema, pCols); filterIter++;
if (filterIter >= nFilterKeys) {
fKey = INT64_MAX;
} else {
fKey = tdGetKey(filterKeys[filterIter]);
} }
numOfRows++;
} }
} while (tSkipListIterNext(pIter)); }
return numOfRows; return 0;
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
...@@ -440,8 +503,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -440,8 +503,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0; pTableData->keyLast = 0;
pTableData->numOfRows = 0; pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, pTableData->pData =
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -461,7 +525,7 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -461,7 +525,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
} }
} }
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
...@@ -583,7 +647,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) { ...@@ -583,7 +647,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) { for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1;
} }
return 0; return 0;
} }
...@@ -779,5 +843,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ...@@ -779,5 +843,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
taosTFree(tData); taosTFree(tData);
return 0;
}
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
}
}
tdAppendDataRowToDataCol(row, *ppSchema, pCols);
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if (pTable != NULL) { if (pTable != NULL) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
goto _err;
} }
if (pCfg->type == TSDB_CHILD_TABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
...@@ -643,7 +644,7 @@ static void tsdbOrgMeta(void *pHandle) { ...@@ -643,7 +644,7 @@ static void tsdbOrgMeta(void *pHandle) {
} }
static char *getTagIndexKey(const void *pData) { static char *getTagIndexKey(const void *pData) {
STable *pTable = *(STable **)pData; STable *pTable = (STable *)pData;
STSchema *pSchema = tsdbGetTableTagSchema(pTable); STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN);
...@@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) { ...@@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
} }
pTable->tagVal = NULL; pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -900,23 +901,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper ...@@ -900,23 +901,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
pTable->pSuper = pSTable; pTable->pSuper = pSTable;
int32_t level = 0; tSkipListPut(pSTable->pIndex, (void *)pTable);
int32_t headSize = 0;
tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize);
// NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
// actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
SSkipListNode *pNode = calloc(1, headSize + sizeof(STable *));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode);
if (refSuper) T_REF_INC(pSTable); if (refSuper) T_REF_INC(pSTable);
return 0; return 0;
} }
...@@ -940,7 +926,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -940,7 +926,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
SSkipListNode *pNode = taosArrayGetP(res, i); SSkipListNode *pNode = taosArrayGetP(res, i);
// STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode); // STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
if (*(STable **)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need if ((STable *)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need
tSkipListRemoveNode(pSTable->pIndex, pNode); tSkipListRemoveNode(pSTable->pIndex, pNode);
} }
} }
...@@ -1170,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { ...@@ -1170,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
buf = tdDecodeSchema(buf, &(pTable->tagSchema)); buf = tdDecodeSchema(buf, &(pTable->tagSchema));
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable); tsdbFreeTable(pTable);
...@@ -1197,7 +1183,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { ...@@ -1197,7 +1183,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM);
} else { } else {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1)); tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_SIZE(pTable->pIndex) + 1));
} else { } else {
tlen = sizeof(SListNode) + sizeof(SActObj); tlen = sizeof(SListNode) + sizeof(SActObj);
} }
...@@ -1244,7 +1230,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) { ...@@ -1244,7 +1230,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
} }
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); STable *tTable = (STable *)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE); ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable); pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable);
} }
...@@ -1269,7 +1255,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { ...@@ -1269,7 +1255,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
tsdbWLockRepoMeta(pRepo); tsdbWLockRepoMeta(pRepo);
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); STable *tTable = (STable *)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
tsdbRemoveTableFromMeta(pRepo, tTable, false, false); tsdbRemoveTableFromMeta(pRepo, tTable, false, false);
} }
......
此差异已折叠。
此差异已折叠。
...@@ -8,7 +8,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) ...@@ -8,7 +8,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
IF (TD_LINUX) IF (TD_LINUX)
TARGET_LINK_LIBRARIES(tutil m rt) TARGET_LINK_LIBRARIES(tutil m rt)
ADD_SUBDIRECTORY(tests) # ADD_SUBDIRECTORY(tests)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST) IF (ICONV_INCLUDE_EXIST)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TFILE_H
#define TDENGINE_TFILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <unistd.h>
// init taos file module
int32_t tfinit();
// clean up taos file module
void tfcleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count);
int64_t tfread(int64_t tfd, void *buf, int64_t count);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H
...@@ -21,38 +21,48 @@ ...@@ -21,38 +21,48 @@
extern "C" { extern "C" {
#endif #endif
// open an instance, return refId which will be used by other APIs // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
int taosOpenRef(int max, void (*fp)(void *)); // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int taosOpenRef(int max, void (*fp)(void *));
// close the Ref instance // close the reference set, refId is the return value by taosOpenRef
void taosCloseRef(int refId); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosCloseRef(int refId);
// add ref, p is the pointer to resource or pointer ID // add ref, p is the pointer to resource or pointer ID
int taosAddRef(int refId, void *p); // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
#define taosRemoveRef taosReleaseRef int64_t taosAddRef(int refId, void *p);
// acquire ref, p is the pointer to resource or pointer ID // remove ref, rid is the reference ID returned by taosAddRef
int taosAcquireRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosRemoveRef(int rsetId, int64_t rid);
// release ref, p is the pointer to resource or pinter ID // acquire ref, rid is the reference ID returned by taosAddRef
void taosReleaseRef(int refId, void *p); // return the resource p. On error, NULL is returned, and terrno is set appropriately
void *taosAcquireRef(int rsetId, int64_t rid);
// return the first if p is null, otherwise return the next after p // release ref, rid is the reference ID returned by taosAddRef
void *taosIterateRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosReleaseRef(int rsetId, int64_t rid);
// return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system // return the number of references in system
int taosListRef(); int taosListRef();
/* sample code to iterate the refs /* sample code to iterate the refs
void demoIterateRefs(int refId) { void demoIterateRefs(int rsetId) {
void *p = taosIterateRef(refId, NULL); void *p = taosIterateRef(refId, 0);
while (p) { while (p) {
// process P // process P
// get the rid from p
p = taosIterateRef(refId, p); p = taosIterateRef(rsetId, rid);
} }
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() { ...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() {
printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size); printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size);
printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
// printf("the level of skiplist is:\n"); // printf("the level of skiplist is:\n");
// //
...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() { ...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() {
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
tSkipListDestroy(pSkipList); tSkipListDestroy(pSkipList);
taosTFree(total); taosTFree(total);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册