提交 4de11d83 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -651,6 +651,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { ...@@ -651,6 +651,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
SDataRow trow = (SDataRow)pDataBlock; SDataRow trow = (SDataRow)pDataBlock;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
dataRowSetVersion(trow, pTableMeta->sversion);
int toffset = 0; int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) { for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
...@@ -759,10 +760,6 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -759,10 +760,6 @@ void tscCloseTscObj(STscObj* pObj) {
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
if (pSql) {
sem_destroy(&pSql->rspSem);
}
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
if (pObj->pDnodeConn != NULL) { if (pObj->pDnodeConn != NULL) {
......
...@@ -119,22 +119,24 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); ...@@ -119,22 +119,24 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format is like below: /* A data row, the format is like below:
* |<------------------------------------- len ---------------------------------->| * |<--------------------+--------------------------- len ---------------------------------->|
* |<--Head ->|<--------- flen -------------->| | * |<-- Head -->|<--------- flen -------------->| |
* +----------+---------------------------------+---------------------------------+ * +---------------------+---------------------------------+---------------------------------+
* | int32_t | | | * | int16_t | int16_t | | |
* +----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | len | First part | Second part | * | len | sversion | First part | Second part |
* +----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
*/ */
typedef void *SDataRow; typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t) #define TD_DATA_ROW_HEAD_SIZE sizeof(int16_t)*2
#define dataRowLen(r) (*(int32_t *)(r)) #define dataRowLen(r) (*(int16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
......
...@@ -159,7 +159,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { ...@@ -159,7 +159,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
/** /**
* Initialize a data row * Initialize a data row
*/ */
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); } void tdInitDataRow(SDataRow row, STSchema *pSchema) {
dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
dataRowSetVersion(row, schemaVersion(pSchema));
}
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema); int32_t size = dataRowMaxBytesFromSchema(pSchema);
......
...@@ -602,6 +602,7 @@ typedef struct { ...@@ -602,6 +602,7 @@ typedef struct {
} SMDVnodeDesc; } SMDVnodeDesc;
typedef struct { typedef struct {
char db[TSDB_DB_NAME_LEN + 1];
SMDVnodeCfg cfg; SMDVnodeCfg cfg;
SMDVnodeDesc nodes[TSDB_MAX_REPLICA]; SMDVnodeDesc nodes[TSDB_MAX_REPLICA];
} SMDCreateVnodeMsg; } SMDCreateVnodeMsg;
......
...@@ -539,6 +539,8 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -539,6 +539,8 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
strcpy(pVnode->db, pVgroup->dbName);
SMDVnodeCfg *pCfg = &pVnode->cfg; SMDVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId); pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion); pCfg->cfgVersion = htonl(pDb->cfgVersion);
...@@ -594,7 +596,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) { ...@@ -594,7 +596,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) {
} }
void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle); mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p db:%s", pVgroup->vgId, pVgroup->vgId, ahandle, pVgroup->dbName);
SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup); SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = ahandle, .handle = ahandle,
......
...@@ -211,7 +211,7 @@ static void* taosAcceptTcpConnection(void *arg) { ...@@ -211,7 +211,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break; break;
} }
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
continue; continue;
} }
......
...@@ -51,6 +51,7 @@ typedef struct { ...@@ -51,6 +51,7 @@ typedef struct {
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
char *rootDir; char *rootDir;
char db[TSDB_DB_NAME_LEN + 1];
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type); int vnodeWriteToQueue(void *param, void *pHead, int type);
......
...@@ -498,7 +498,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -498,7 +498,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
} }
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pVnodeCfg->db);
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion); len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
...@@ -570,6 +570,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -570,6 +570,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) {
vError("vgId:%d, failed to read vnode cfg, db not found", pVnode->vgId);
goto PARSE_OVER;
}
strcpy(pVnode->db, db->valuestring);
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) { if (!cfgVersion || cfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode->vgId); vError("vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode->vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册