提交 704a7ff3 编写于 作者: L liushepeng

YTFS core services

上级 112dbf42
package com.ytfs.service;
import com.ytfs.service.codec.BlockAESDecryptor;
import com.ytfs.service.codec.BlockEncrypted;
import com.ytfs.service.codec.KeyStoreCoder;
import com.ytfs.service.codec.ObjectRefer;
import com.ytfs.service.codec.Shard;
import com.ytfs.service.codec.ShardRSDecoder;
import com.ytfs.service.net.P2PUtils;
import com.ytfs.service.node.Node;
import com.ytfs.service.node.SuperNodeList;
......@@ -12,6 +14,7 @@ import com.ytfs.service.packet.DownloadBlockInitReq;
import com.ytfs.service.packet.DownloadBlockInitResp;
import com.ytfs.service.packet.DownloadShardReq;
import com.ytfs.service.packet.DownloadShardResp;
import static com.ytfs.service.packet.ServiceErrorCode.INTERNAL_ERROR;
import static com.ytfs.service.packet.ServiceErrorCode.INVALID_SHARD;
import com.ytfs.service.packet.ServiceException;
import java.util.ArrayList;
......@@ -39,13 +42,17 @@ public class DownloadBlock {
Node pbd = SuperNodeList.getBlockSuperNode(refer.getSuperID());
Object resp = P2PUtils.requestBPU(req, pbd);
if (resp instanceof DownloadBlockDBResp) {
// this.data = aesDBDecode(((DownloadBlockDBResp) resp).getData());
this.data = aesDBDecode(((DownloadBlockDBResp) resp).getData());
} else {
DownloadBlockInitResp initresp = (DownloadBlockInitResp) resp;
if (initresp.getVNF() < 0) {
this.data = loadCopyShard(initresp);
} else {
this.data = loadRSShard(initresp);
try {
this.data = loadRSShard(initresp);
} catch (InterruptedException e) {
throw new ServiceException(INTERNAL_ERROR, e.getMessage());
}
}
}
}
......@@ -57,33 +64,48 @@ public class DownloadBlock {
}
}
private byte[] loadRSShard(DownloadBlockInitResp initresp) {
return null;
}
private void firstDownload( ) throws InterruptedException {
/*
List<Shard> shards = enc.getEnc_shards();
private byte[] loadRSShard(DownloadBlockInitResp initresp) throws InterruptedException, ServiceException {
List<Shard> shards = new ArrayList();
int len = initresp.getVNF() - UserConfig.Default_PND;
int nodeindex = 0;
for (Shard sd : shards) {
map.put(nodeindex, sd);
ShardNode n = nodes[nodeindex];
UploadShardReq req = new UploadShardReq();
req.setBPDID(bpdNode.getNodeId());
req.setBPDSIGN(n.getSign());
req.setDAT(sd.getData());
req.setSHARDID(nodeindex);
req.setVBI(VBI);
req.setVHF(sd.getVHF());
req.sign(nodes[nodeindex].getNodeId());
UploadShard.startUploadShard(req, n, this);
nodeindex++;
}
synchronized (this) {
if (resList.size() != shards.size()) {
this.wait(1000 * 15);
while (true) {
int count = len - shards.size();
if (count <= 0) {
break;
}
if (count > initresp.getNodes().length - nodeindex) {
break;
}
for (int ii = 0; ii < count; ii++) {
Node n = initresp.getNodes()[nodeindex];
byte[] VHF = initresp.getVHF()[nodeindex];
DownloadShardReq req = new DownloadShardReq();
req.setVHF(VHF);
DownloadShare.startDownloadShard(VHF, n, this);
nodeindex++;
}
synchronized (this) {
if (resList.size() != count) {
this.wait(1000 * 15);
}
}
for (DownloadShardResp res : resList) {
if (res.getData() != null) {
shards.add(new Shard(res.getData()));
}
}
}*/
resList.clear();
}
if (shards.size() >= len) {
BlockEncrypted be = new BlockEncrypted(refer.getRealSize());
ShardRSDecoder rsdec = new ShardRSDecoder(shards, be.getEncryptedBlockSize());
be = rsdec.decode();
BlockAESDecryptor dec = new BlockAESDecryptor(be.getData(), refer.getRealSize(), ks);
dec.decrypt();
return dec.getSrcData();
} else {
throw new ServiceException(INTERNAL_ERROR);
}
}
private byte[] loadCopyShard(DownloadBlockInitResp initresp) throws ServiceException {
......@@ -98,7 +120,7 @@ public class DownloadBlock {
req.setVHF(VHF);
DownloadShardResp resp = (DownloadShardResp) P2PUtils.requestNode(req, n);
if (resp.verify(VHF)) {
// return aesCopyDecode(resp.getData());
return aesCopyDecode(resp.getData());
}
index++;
} catch (ServiceException e) {
......@@ -107,20 +129,20 @@ public class DownloadBlock {
}
throw t == null ? new ServiceException(INVALID_SHARD) : t;
}
/*
private byte[] aesCopyDecode(byte[] data) {
ShardAESDecryptor dec = new ShardAESDecryptor(new Shard(data), ks);
BlockEncrypted be = new BlockEncrypted(refer.getRealSize());
ShardRSDecoder rsdec = new ShardRSDecoder(new Shard(data), be.getEncryptedBlockSize());
be = rsdec.decode();
BlockAESDecryptor dec = new BlockAESDecryptor(be.getData(), refer.getRealSize(), ks);
dec.decrypt();
Shard s = dec.getDec_shard();
byte[] newdata = new byte[refer.getRealSize()];
System.arraycopy(s.getData(), 1, data, 0, newdata.length);
return newdata;
return dec.getSrcData();
}
private byte[] aesDBDecode(byte[] data) {
BlockAESDecryptor dec = new BlockAESDecryptor(data, ks);
BlockAESDecryptor dec = new BlockAESDecryptor(data, refer.getRealSize(), ks);
dec.decrypt();
return dec.getBlock().getData();
return dec.getSrcData();
}
*/
}
......@@ -7,9 +7,12 @@ import com.ytfs.service.packet.DownloadShardReq;
import com.ytfs.service.packet.DownloadShardResp;
import com.ytfs.service.packet.ServiceException;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.log4j.Logger;
public class DownloadShare implements Runnable {
private static final Logger LOG = Logger.getLogger(DownloadShare.class);
private static final ArrayBlockingQueue<DownloadShare> queue;
static {
......@@ -40,9 +43,16 @@ public class DownloadShare implements Runnable {
DownloadShardResp resp = new DownloadShardResp();
try {
resp = (DownloadShardResp) P2PUtils.requestNode(req, node);
if (!resp.verify(req.getVHF())) {
LOG.error("VHF inconsistency.");
downloadBlock.onResponse(new DownloadShardResp());
} else {
downloadBlock.onResponse(resp);
}
} catch (ServiceException ex) {
LOG.error("Network error.");
downloadBlock.onResponse(new DownloadShardResp());
}
downloadBlock.onResponse(resp);
} finally {
queue.add(this);
}
......
......@@ -23,7 +23,7 @@ import java.util.Map;
public class UploadBlock {
private ShardRSEncoder rs;
private Block block;
private final Block block;
private final short id;
private final ShardNode[] nodes;
private final long VBI;
......
......@@ -9,9 +9,11 @@ import com.ytfs.service.packet.ShardNode;
import com.ytfs.service.packet.UploadShard2CResp;
import com.ytfs.service.packet.UploadShardReq;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.log4j.Logger;
public class UploadShard implements Runnable {
private static final Logger LOG = Logger.getLogger(UploadShard.class);
private static final ArrayBlockingQueue<UploadShard> queue;
static {
......@@ -45,6 +47,7 @@ public class UploadShard implements Runnable {
UploadShard2CResp resp = (UploadShard2CResp) P2PUtils.requestNode(req, node);
res.setRES(resp.getRES());
} catch (ServiceException ex) {
LOG.error("Network error.");
res.setRES(RES_NETIOERR);
}
uploadBlock.onResponse(res);
......
......@@ -3,7 +3,12 @@ package com.ytfs.service.codec;
public class Shard {
private final byte[] data;//数据分片内容
private final byte[] VHF;
private final byte[] VHF;
public Shard(byte[] data) {
this.data = data;
this.VHF = null;
}
public Shard(byte[] data, byte[] VHF) {
this.data = data;
......
......@@ -2,21 +2,28 @@ package com.ytfs.service.codec;
import static com.ytfs.service.UserConfig.Default_PND;
import com.ytfs.service.codec.erasure.ReedSolomon;
import java.io.IOException;
import java.util.List;
public class ShardRSDecoder {
private final List<Shard> shards;
private final int encryptedBlockSize;
private final Shard copyShard;
public ShardRSDecoder(List<Shard> shards, int encryptedBlockSize) {
this.shards = shards;
this.encryptedBlockSize = encryptedBlockSize;
this.copyShard = null;
}
public BlockEncrypted decode() throws IOException {
Shard shard = shards.get(0);
public ShardRSDecoder(Shard shard, int encryptedBlockSize) {
this.shards = null;
this.encryptedBlockSize = encryptedBlockSize;
this.copyShard = shard;
}
public BlockEncrypted decode() {
Shard shard = copyShard == null ? shards.get(0) : copyShard;
if (!shard.isRsShard()) {//副本
byte[] data = new byte[encryptedBlockSize];
System.arraycopy(shard.getData(), 1, data, 0, encryptedBlockSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册