Browse Source

Improved event handling to reduce memory allocations and copying

master
Artyom Beilis 11 years ago
parent
commit
389a6d5006
7 changed files with 147 additions and 69 deletions
  1. +10
    -0
      booster/booster/aio/io_service.h
  2. +5
    -2
      booster/booster/callback.h
  3. +7
    -0
      booster/booster/intrusive_ptr.h
  4. +2
    -4
      booster/lib/aio/src/basic_io_device.cpp
  5. +118
    -28
      booster/lib/aio/src/io_service.cpp
  6. +0
    -25
      booster/lib/aio/src/socket_details.h
  7. +5
    -10
      booster/lib/aio/src/stream_socket.cpp

+ 10
- 0
booster/booster/aio/io_service.h View File

@@ -118,6 +118,16 @@ namespace aio {
void post(handler const &h);

///
/// Post event completion hander with its status
///
void post(event_handler const &h,booster::system::error_code const &e);
///
/// Post event i/o completion hander with its status and i/o size
///
void post(io_handler const &h,booster::system::error_code const &e,size_t n);


///
/// Get the real name of the reactor that io_service uses (calls reactor::name())
///



+ 5
- 2
booster/booster/callback.h View File

@@ -54,7 +54,7 @@ namespace booster {
template<typename Result,typename ...Params>
class callback<Result(Params...)>
{
public:
public:
///
/// Type of result, for use with boost::bind
///
@@ -124,6 +124,7 @@ namespace booster {
\
typedef callable<Result(BOOSTER_TEMPLATE_TYPE_PARAMS)> \
callable_type; \
typedef intrusive_ptr<callable_type> pointer_type; \
\
template<typename R,typename F> \
struct callable_impl : public callable_type { \
@@ -189,9 +190,11 @@ namespace booster {
operator bool() const { return !empty(); } \
\
void swap(callback &other) { call_ptr.swap(other.call_ptr); } \
pointer_type const &get_pointer() const { return call_ptr; } \
pointer_type &get_pointer() { return call_ptr; } \
\
private: \
intrusive_ptr<callable_type> call_ptr; \
pointer_type call_ptr; \
}; \

#define BOOSTER_TEMPLATE_PARAMS


+ 7
- 0
booster/booster/intrusive_ptr.h View File

@@ -58,6 +58,13 @@ public:
if(p_ != 0 && add_ref) intrusive_ptr_add_ref(p_);
}

T *release()
{
T *r = p_;
p_ = 0;
return r;
}

intrusive_ptr(intrusive_ptr const & rhs): p_(rhs.p_)
{
if(p_ != 0) intrusive_ptr_add_ref(p_);


+ 2
- 4
booster/lib/aio/src/basic_io_device.cpp View File

@@ -209,8 +209,7 @@ bool basic_io_device::dont_block(io_handler const &h)
system::error_code e;
set_non_blocking(true,e);
if(e) {
io_binder::pointer b(new io_binder( h, 0, e));
get_io_service().post(b);
get_io_service().post(h,e,0);
return false;
}
nonblocking_was_set_ = true;
@@ -224,8 +223,7 @@ bool basic_io_device::dont_block(event_handler const &h)
system::error_code e;
set_non_blocking(true,e);
if(e) {
event_binder::pointer b(new event_binder( h, e));
get_io_service().post(b);
get_io_service().post(h,e);
return false;
}
nonblocking_was_set_ = true;


+ 118
- 28
booster/lib/aio/src/io_service.cpp View File

@@ -113,6 +113,82 @@ typedef unique_lock<recursive_mutex> lock_guard;
class event_loop_impl {
public:
struct completion_handler {
booster::intrusive_ptr<booster::refcounted> h;
booster::system::error_code e;
size_t n;
enum { none, op_handler, op_event_handler, op_io_handler } type;
completion_handler() :
n(0),
type(none)
{
}
completion_handler(handler const &inh) :
h(inh.get_pointer()),
n(0),
type(op_handler)
{
}
completion_handler(handler &inh) :
h(inh.get_pointer().release(),false),
n(0),
type(op_handler)
{
}
completion_handler(event_handler const &inh,booster::system::error_code const &ine) :
h(inh.get_pointer()),
e(ine),
n(0),
type(op_event_handler)
{
}
completion_handler(event_handler &inh,booster::system::error_code const &ine) :
h(inh.get_pointer().release(),false),
e(ine),
n(0),
type(op_event_handler)
{
}
completion_handler(io_handler const &inh,booster::system::error_code const &ine,size_t inn) :
h(inh.get_pointer()),
e(ine),
n(inn),
type(op_io_handler)
{
}
completion_handler(io_handler &inh,booster::system::error_code const &ine,size_t inn) :
h(inh.get_pointer().release(),false),
e(ine),
n(inn),
type(op_io_handler)
{
}

void operator()()
{
booster::refcounted &call = *h;
switch(type) {
case none:
break;
case op_handler:
static_cast<handler::callable_type &>(call)();
break;
case op_event_handler:
static_cast<event_handler::callable_type &>(call)(e);
break;
case op_io_handler:
static_cast<io_handler::callable_type &>(call)(e,n);
break;
}
}
void swap(completion_handler &other) {
h.swap(other.h);
std::swap(e,other.e);
std::swap(n,other.n);
std::swap(type,other.type);
}
};
void set_io_event(native_type fd,int event,event_handler const &h)
{
if(event != io_events::in && event !=io_events::out)
@@ -169,7 +245,21 @@ public:
void post(handler const &h)
{
lock_guard l(data_mutex_);
dispatch_queue_.push_back(h);
dispatch_queue_.push_back(completion_handler(h));
if(polling_)
wake();
}
void post(event_handler const &h,booster::system::error_code const &e)
{
lock_guard l(data_mutex_);
dispatch_queue_.push_back(completion_handler(h,e));
if(polling_)
wake();
}
void post(io_handler const &h,booster::system::error_code const &e,size_t n)
{
lock_guard l(data_mutex_);
dispatch_queue_.push_back(completion_handler(h,e,n));
if(polling_)
wake();
}
@@ -231,7 +321,7 @@ public:

timer_events_type::iterator evptr = timer_events_index_[event_id];
event_handler_dispatcher evdisp(evptr->second.h,system::error_code(aio_error::canceled,aio_error_cat));
completion_handler evdisp(evptr->second.h,system::error_code(aio_error::canceled,aio_error_cat));
dispatch_queue_.push_back(evdisp);
timer_events_.erase(evptr);
timer_events_index_[event_id]=timer_events_.end();
@@ -281,7 +371,7 @@ private:
//
socket_map<io_data> map_;
// events dispatch queue
std::deque<handler> dispatch_queue_;
std::deque<completion_handler> dispatch_queue_;

void closesocket(native_type fd)
{
@@ -322,9 +412,9 @@ private:
e = system::error_code(aio_error::canceled,aio_error_cat);
// Maybe it is closed
if(cont.readable)
self_->dispatch_queue_.push_back(event_handler_dispatcher(cont.readable,e));
self_->dispatch_queue_.push_back(completion_handler(cont.readable,e));
if(cont.writeable)
self_->dispatch_queue_.push_back(event_handler_dispatcher(cont.writeable,e));
self_->dispatch_queue_.push_back(completion_handler(cont.writeable,e));
self_->map_.erase(fd);
}
};
@@ -348,7 +438,7 @@ private:
#else
system::error_code e(EBADF,syscat);
#endif
self_->dispatch_queue_.push_back(event_handler_dispatcher(h,e));
self_->dispatch_queue_.push_back(completion_handler(h,e));
return;
}

@@ -363,7 +453,7 @@ private:
self_->map_[fd].writeable = h;
}
else {
self_->dispatch_queue_.push_back(event_handler_dispatcher(h,e));
self_->dispatch_queue_.push_back(completion_handler(h,e));
}
}
};
@@ -408,7 +498,7 @@ private:
{
lock_guard l(data_mutex_);
if(polling_ || !reactor_.get()) {
dispatch_queue_.push_back(f);
dispatch_queue_.push_back(completion_handler(f));
if(reactor_.get())
wake();
}
@@ -422,7 +512,7 @@ private:
if(!f.cancelation_is_needed_with_data_mutex_locked())
return;
if(polling_ || !reactor_.get()) {
dispatch_queue_.push_back(f);
dispatch_queue_.push_back(completion_handler(f));
if(reactor_.get())
wake();
}
@@ -431,19 +521,6 @@ private:
}
}

struct event_handler_dispatcher {
event_handler_dispatcher(event_handler &hn,system::error_code const &err) : e(err)
{
h.swap(hn);
}
event_handler h;
system::error_code e;
void operator()() const
{
h(e);
}
};

bool run_one(reactor::event *evs,size_t evs_size)
{
lock_guard l(data_mutex_);
@@ -456,7 +533,7 @@ private:

int counter = dispatch_queue_.size();
while(!stop_ && !dispatch_queue_.empty() && counter > 0) {
handler exec;
completion_handler exec;
exec.swap(dispatch_queue_.front());
dispatch_queue_.pop_front();
@@ -478,7 +555,7 @@ private:
while(!stop_ && !timer_events_.empty() && timer_events_.begin()->first <= now) {
timer_events_type::iterator evptr = timer_events_.begin();
timer_events_index_[evptr->second.event_id] = timer_events_.end();
event_handler_dispatcher disp(evptr->second.h,system::error_code());
completion_handler disp(evptr->second.h,system::error_code());
dispatch_queue_.push_back(disp);
timer_events_.erase(evptr);
}
@@ -569,10 +646,12 @@ private:
cont.current_event = new_events;

if(cont.readable && (new_events & reactor::in) == 0)
dispatch_queue_.push_back(event_handler_dispatcher(cont.readable,dispatch_error));
if(cont.writeable && (new_events & reactor::out) == 0)
dispatch_queue_.push_back(event_handler_dispatcher(cont.writeable,dispatch_error));
if(cont.readable && (new_events & reactor::in) == 0) {
dispatch_queue_.push_back(completion_handler(cont.readable,dispatch_error));
}
if(cont.writeable && (new_events & reactor::out) == 0) {
dispatch_queue_.push_back(completion_handler(cont.writeable,dispatch_error));
}
if(new_events == 0)
map_.erase(evs[i].fd);
@@ -643,6 +722,17 @@ void io_service::post(handler const &h)
impl_->post(h);
}

void io_service::post(event_handler const &h,booster::system::error_code const &e)
{
impl_->post(h,e);
}

void io_service::post(io_handler const &h,booster::system::error_code const &e,size_t n)
{
impl_->post(h,e,n);
}


int io_service::set_timer_event(ptime const &t,event_handler const &h)
{
return impl_->set_timer_event(t,h);


+ 0
- 25
booster/lib/aio/src/socket_details.h View File

@@ -64,31 +64,6 @@ namespace socket_details {
return res;
}


struct io_binder : public callable<void()> {
typedef std::auto_ptr<io_binder> pointer;
io_handler h;
size_t n;
system::error_code e;
io_binder(io_handler const &ih,size_t in,system::error_code const &ie) : h(ih),n(in),e(ie) {}
void operator()()
{
h(e,n);
}
};

struct event_binder : public callable<void()> {
event_handler h;
system::error_code e;
event_binder(event_handler const &ih,system::error_code const &ie) : h(ih),e(ie) {}
typedef std::auto_ptr<event_binder> pointer;
void operator()()
{
h(e);
}
};


} // socket_details

using namespace socket_details;


+ 5
- 10
booster/lib/aio/src/stream_socket.cpp View File

@@ -370,8 +370,7 @@ namespace {
count+=n;
buf+=n;
if(buf.empty() || (e && !basic_io_device::would_block(e))) {
io_binder::pointer binder(new io_binder( h, count, e));
self->get_io_service().post(binder);
self->get_io_service().post(h,e,count);
}
else {
self->on_readable(intrusive_ptr<reader_all>(this));
@@ -429,8 +428,7 @@ namespace {
count+=n;
buf+=n;
if(buf.empty() || (e && !basic_io_device::would_block(e))) {
io_binder::pointer binder(new io_binder( h, count, e ));
self->get_io_service().post(binder);
self->get_io_service().post(h,e,count);
}
else {
self->on_writeable(intrusive_ptr<writer_all>(this));
@@ -482,8 +480,7 @@ void stream_socket::async_write_some(const_buffer const &buffer,io_handler const
on_writeable(writer);
}
else {
io_binder::pointer binder(new io_binder( h,n,e ));
get_io_service().post(binder);
get_io_service().post(h,e,n);
}
#endif
}
@@ -503,8 +500,7 @@ void stream_socket::async_read_some(mutable_buffer const &buffer,io_handler cons
on_readable(reader);
}
else {
io_binder::pointer binder(new io_binder( h,n,e ));
get_io_service().post(binder);
get_io_service().post(h,e,n);
}
#endif
}
@@ -520,8 +516,7 @@ void stream_socket::async_connect(endpoint const &ep,event_handler const &h)
on_writeable(connector);
}
else {
event_binder::pointer binder(new event_binder( h,e ));
get_io_service().post(binder);
get_io_service().post(h,e);
}
}



Loading…
Cancel
Save