提交 4b5ad024 编写于 作者: Y youngowlf 提交者: youngwolf

1.2.5 release.

Fix:
Support unmovable buffers (for example: a very short std::string).
begin_time not inherited while accessing concurrent queue.
Eliminate race condition in udp::socket_base.
Eliminate race condition in demo file_client.
Avoid division by zero error in demo file_client.
上级 8b3a1c45
......@@ -17,9 +17,9 @@
#define RESTART_COMMAND "restart"
#define REQUEST_FILE "get"
std::atomic_ushort completed_client_num;
int link_num = 1;
fl_type file_size;
std::atomic_int_fast64_t received_size;
int main(int argc, const char* argv[])
{
......@@ -65,24 +65,12 @@ int main(int argc, const char* argv[])
str.erase(0, sizeof(REQUEST_FILE));
auto files = split_string(str);
do_something_to_all(files, [&](const std::string& item) {
completed_client_num = 0;
file_size = 0;
file_size = -1;
received_size = 0;
printf("transfer %s begin.\n", item.data());
if (client.find(0)->get_file(item))
{
//client.do_something_to_all([&item](file_client::object_ctype& item2) {if (0 != item2->id()) item2->get_file(item);});
//if you always return false, do_something_to_one will be equal to do_something_to_all.
client.do_something_to_one([&item](file_client::object_ctype& item2)->bool {if (0 != item2->id()) item2->get_file(item); return false;});
client.start();
while (completed_client_num != (unsigned short) link_num)
if (client.get_file(item))
while (client.is_transferring())
std::this_thread::sleep_for(std::chrono::milliseconds(50));
client.stop(item);
}
else
printf("transfer %s failed!\n", item.data());
});
}
else
......
......@@ -10,9 +10,9 @@ using namespace ascs::ext::tcp;
#include "unpacker.h"
extern std::atomic_ushort completed_client_num;
extern int link_num;
extern fl_type file_size;
extern std::atomic_int_fast64_t received_size;
class file_socket : public base_socket, public client_socket
{
......@@ -24,13 +24,6 @@ public:
virtual void reset() {clear(); client_socket::reset();}
void set_index(int index_) {index = index_;}
fl_type get_rest_size() const
{
auto up = std::dynamic_pointer_cast<const data_unpacker>(unpacker());
return up ? up->get_rest_size() : 0;
}
operator fl_type() const {return get_rest_size();}
bool get_file(const std::string& file_name)
{
assert(!file_name.empty());
......@@ -103,7 +96,7 @@ private:
unpacker(std::make_shared<ASCS_DEFAULT_UNPACKER>());
}
void trans_end() {clear(); ++completed_client_num;}
void trans_end() {clear();}
void handle_msg(out_msg_ctype& msg)
{
......@@ -182,26 +175,29 @@ public:
file_client(service_pump& service_pump_) : multi_client_base<file_socket>(service_pump_) {}
void start()
{
begin_time.restart();
set_timer(UPDATE_PROGRESS, 50, [this](tid id)->bool {return this->update_progress_handler(id, -1);});
}
void stop(const std::string& file_name)
bool is_transferring() const {return is_timer(UPDATE_PROGRESS);}
bool get_file(const std::string& file_name)
{
stop_timer(UPDATE_PROGRESS);
update_progress_handler(UPDATE_PROGRESS, 0);
printf("\ntransfer %s end, speed: %f MBps.\n", file_name.data(), file_size / begin_time.elapsed() / 1024 / 1024);
}
if (is_transferring())
printf("file transfer is ongoing for file %s", file_name.data());
else
{
printf("transfer %s begin.\n", file_name.data());
if (find(0)->get_file(file_name))
{
//do_something_to_all([&file_name](object_ctype& item) {if (0 != item->id()) item->get_file(file_name);});
//if you always return false, do_something_to_one will be equal to do_something_to_all.
do_something_to_one([&file_name](object_ctype& item)->bool {if (0 != item->id()) item->get_file(file_name); return false;});
begin_time.restart();
set_timer(UPDATE_PROGRESS, 50, [this](tid id)->bool {return this->update_progress_handler(id, -1);});
fl_type get_total_rest_size()
{
fl_type total_rest_size = 0;
do_something_to_all([&total_rest_size](object_ctype& item) {total_rest_size += *item;});
// do_something_to_all([&total_rest_size](object_ctype& item) {total_rest_size += item->get_rest_size();});
return true;
}
else
printf("transfer %s failed!\n", file_name.data());
}
return total_rest_size;
return false;
}
private:
......@@ -209,17 +205,25 @@ private:
{
assert(UPDATE_PROGRESS == id);
auto total_rest_size = get_total_rest_size();
auto new_percent = (unsigned) ((file_size - total_rest_size) * 100 / file_size);
if (last_percent != new_percent)
if (file_size < 0)
return true;
else if (file_size > 0)
{
printf("\r%u%%", new_percent);
fflush(stdout);
auto new_percent = (unsigned) (received_size * 100 / file_size);
if (last_percent != new_percent)
{
printf("\r%u%%", new_percent);
fflush(stdout);
this->update_timer_info(id, 50, [new_percent, this](tid id)->bool {return this->update_progress_handler(id, new_percent);});
update_timer_info(id, 50, [new_percent, this](tid id)->bool {return this->update_progress_handler(id, new_percent);});
}
}
return total_rest_size > 0;
if (received_size < file_size)
return true;
printf("\r100%%\nend, speed: %f MBps.\n", file_size / begin_time.elapsed() / 1024 / 1024);
return false;
}
protected:
......
......@@ -7,6 +7,8 @@ using namespace ascs::tcp;
#include "../file_server/common.h"
extern std::atomic_int_fast64_t received_size;
class data_unpacker : public i_unpacker<replaceable_buffer>
{
public:
......@@ -19,13 +21,12 @@ public:
}
~data_unpacker() {delete[] buffer;}
fl_type get_rest_size() const {return _data_len;}
virtual void reset() {_file = nullptr; delete[] buffer; buffer = nullptr; _data_len = 0;}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
assert(_data_len >= (fl_type) bytes_transferred && bytes_transferred > 0);
_data_len -= bytes_transferred;
received_size += bytes_transferred;
if (bytes_transferred != fwrite(buffer, 1, bytes_transferred, _file))
{
......
......@@ -404,6 +404,9 @@ struct obj_with_begin_time : public T
obj_with_begin_time() {}
obj_with_begin_time(T&& obj) : T(std::move(obj)) {restart();}
obj_with_begin_time& operator=(T&& obj) {T::operator=(std::move(obj)); restart(); return *this;}
//following two functions are used by concurrent queue only, ascs just use swap
obj_with_begin_time(obj_with_begin_time&& other) : T(std::move(other)), begin_time(std::move(other.begin_time)) {}
obj_with_begin_time& operator=(obj_with_begin_time&& other) {T::operator=(std::move(other)); begin_time = std::move(other.begin_time); return *this;}
void restart() {restart(statistic::now());}
void restart(const typename statistic::stat_time& begin_time_) {begin_time = begin_time_;}
......
......@@ -263,6 +263,28 @@
* REPLACEMENTS:
* Always use io_context instead of io_service (before asio 1.11, io_context will be a typedef of io_service).
*
* ===============================================================
* 2017.9.28 version 1.2.5
*
* SPECIAL ATTENTION (incompatible with old editions):
*
* HIGHLIGHT:
*
* FIX:
* Support unmovable buffers (for example: a very short std::string).
* begin_time not inherited while accessing concurrent queue.
* Eliminate race condition in udp::socket_base.
* Eliminate race condition in demo file_client.
* Avoid division by zero error in demo file_client.
*
* ENHANCEMENTS:
*
* DELETION:
*
* REFACTORING:
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -272,8 +294,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10204 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.4"
#define ASCS_VER 10205 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.5"
//asio and compiler check
#ifdef _MSC_VER
......
......@@ -186,9 +186,8 @@ protected:
virtual bool do_send_msg(in_msg_type&& msg)
{
auto buf = ASCS_SEND_BUFFER_TYPE(msg.data(), msg.size());
last_send_msg.emplace_back(std::move(msg));
asio::async_write(this->next_layer(), buf,
asio::async_write(this->next_layer(), ASCS_SEND_BUFFER_TYPE(last_send_msg.back().data(), last_send_msg.back().size()),
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);}));
return true;
}
......
......@@ -142,6 +142,7 @@ protected:
virtual bool do_send_msg(in_msg_type&& msg)
{
last_send_msg = std::move(msg);
std::lock_guard<std::mutex> lock(shutdown_mutex);
this->next_layer().async_send_to(ASCS_SEND_BUFFER_TYPE(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);}));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册