Browse Source

Added async_write based on nonblocking_write

master
Artyom Beilis 8 years ago
parent
commit
a13c972b8b
9 changed files with 239 additions and 166 deletions
  1. +14
    -0
      booster/booster/aio/basic_io_device.h
  2. +14
    -0
      booster/lib/aio/src/basic_io_device.cpp
  3. +15
    -3
      private/cgi_api.h
  4. +154
    -40
      src/cgi_api.cpp
  5. +2
    -59
      src/fastcgi_api.cpp
  6. +10
    -24
      src/http_api.cpp
  7. +22
    -16
      src/http_response.cpp
  8. +2
    -19
      src/scgi_api.cpp
  9. +6
    -5
      tests/dummy_api.h

+ 14
- 0
booster/booster/aio/basic_io_device.h View File

@@ -133,6 +133,20 @@ namespace aio {
/// If a error occurs it is assigned to \a e.
///
void set_non_blocking(bool nonblocking,system::error_code &e);
///
/// Set the object to blocking or non-blocking mode. It checks if set_non_blocking() was
/// called before and does nothing if previous call matches the request
///
/// Throws system::system_error if error occurs.
///
void set_non_blocking_if_needed(bool nonblocking);
///
/// Set the object to blocking or non-blocking mode. It checks if set_non_blocking() was
/// called before and does nothing if previous call matches the request
///
/// If a error occurs it is assigned to \a e.
///
void set_non_blocking_if_needed(bool nonblocking,system::error_code &e);

///
/// Check if a error code \a e reports that the non-blocking operation would block


+ 14
- 0
booster/lib/aio/src/basic_io_device.cpp View File

@@ -164,6 +164,20 @@ void basic_io_device::set_non_blocking(bool nonblocking,system::error_code &e)
nonblocking_was_set_=nonblocking;
}

void basic_io_device::set_non_blocking_if_needed(bool nonblocking)
{
if(nonblocking_was_set_ == nonblocking)
return;
set_non_blocking(nonblocking);
}

void basic_io_device::set_non_blocking_if_needed(bool nonblocking,booster::system::error_code &e)
{
if(nonblocking_was_set_ == nonblocking)
return;
set_non_blocking(nonblocking,e);
}

void basic_io_device::set_non_blocking(bool nonblocking)
{
system::error_code e;


+ 15
- 3
private/cgi_api.h View File

@@ -111,11 +111,19 @@ namespace cgi {
// These are abstract member function that should be implemented by
// actual protocol like FCGI, SCGI, HTTP or CGI
public:
virtual void async_write(booster::aio::const_buffer const &buf,io_handler const &h,bool eof) = 0;
virtual size_t write(booster::aio::const_buffer const &buf,booster::system::error_code &e,bool eof) = 0;
bool has_pending();
virtual bool nonblocking_write(booster::aio::const_buffer const &buf,bool eof,booster::system::error_code &e);
virtual void async_write(booster::aio::const_buffer const &buf,bool eof,handler const &h);
virtual bool write(booster::aio::const_buffer const &buf,bool eof,booster::system::error_code &e);

virtual void on_some_output_written() = 0;
virtual void do_eof() = 0;
virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool completed,booster::system::error_code &e) = 0;
virtual bool write_to_socket(booster::aio::const_buffer const &in,booster::system::error_code &e);
protected:
void append_pending(booster::aio::const_buffer const &new_data);

virtual booster::aio::stream_socket &socket() = 0;


virtual void async_read_headers(handler const &h) = 0;
@@ -134,6 +142,7 @@ namespace cgi {
string_pool pool_;
string_map env_;
std::vector<char> pending_output_;

booster::shared_ptr<connection> self();
void async_read(void *,size_t,io_handler const &h);
@@ -142,10 +151,12 @@ namespace cgi {
struct reader;
struct cgi_forwarder;
struct async_write_binder;
struct async_write_handler;

friend struct reader;
friend struct writer;
friend struct async_write_binder;
friend struct async_write_handler;
friend struct cgi_forwarder;

void set_error(ehandler const &h,std::string s);
@@ -155,7 +166,8 @@ namespace cgi {
void on_some_multipart_read(booster::system::error_code const &e,size_t n,http::context *,ehandler const &h);
void handle_eof(callback const &on_eof);
void handle_http_error(int code,http::context *context,ehandler const &h);
void handle_http_error_eof(booster::system::error_code const &e,size_t n,int code,ehandler const &h);
void handle_http_error_eof(booster::system::error_code const &e,int code,ehandler const &h);
booster::intrusive_ptr<connection::async_write_binder> get_write_binder(ehandler const &h,bool complete_response);

std::vector<char> content_;
cppcms::service *service_;


+ 154
- 40
src/cgi_api.cpp View File

@@ -121,18 +121,14 @@ namespace cppcms { namespace impl { namespace cgi {
void on_response_read(booster::system::error_code const &e,size_t len)
{
if(e) {
conn_->async_write(booster::aio::const_buffer(),
boost::bind(&cgi_forwarder::cleanup,shared_from_this()),
true);
conn_->async_write(booster::aio::const_buffer(),true,boost::bind(&cgi_forwarder::cleanup,shared_from_this()));
return;
}
else {
conn_->async_write(booster::aio::buffer(&response_.front(),len),
boost::bind(&cgi_forwarder::on_response_written,shared_from_this(),_1,_2),
false);
conn_->async_write(booster::aio::buffer(&response_.front(),len),false,boost::bind(&cgi_forwarder::on_response_written,shared_from_this(),_1));
}
}
void on_response_written(booster::system::error_code const &e,size_t /*len*/)
void on_response_written(booster::system::error_code const &e)
{
if(e) { cleanup(); return; }
scgi_.async_read_some(booster::aio::buffer(response_),
@@ -263,20 +259,17 @@ void connection::handle_http_error(int code,http::context *context,ehandler cons
async_chunk_ += "</h1>\r\n"
"</body>\r\n"
"</html>\r\n";
async_write(booster::aio::buffer(async_chunk_),
async_write(booster::aio::buffer(async_chunk_),true,
boost::bind(
&connection::handle_http_error_eof,
self(),
_1,
_2,
code,
h),
true);
h));
}

void connection::handle_http_error_eof(
booster::system::error_code const &e,
size_t /*n*/,
int code,
ehandler const &h)
{
@@ -428,59 +421,180 @@ std::string connection::last_error()
return error_;
}

struct connection::async_write_binder : public booster::callable<void(booster::system::error_code const &,size_t)> {
typedef booster::shared_ptr<cppcms::impl::cgi::connection> self_type;
self_type self;
struct connection::async_write_binder : public booster::callable<void(booster::system::error_code const &)> {
typedef booster::shared_ptr<cppcms::impl::cgi::connection> conn_type;
conn_type conn;
ehandler h;
bool complete_response;
booster::shared_ptr<std::vector<char> > block;
async_write_binder() :complete_response(false) {}
void init(self_type c,bool comp,ehandler const &hnd,booster::shared_ptr<std::vector<char> > const &b)
{
self=c;
complete_response = comp;
h = hnd;
block = b;
}

void reset()
{
h=ehandler();
self.reset();
conn.reset();
complete_response = false;
block.reset();
}
void operator()(booster::system::error_code const &e,size_t)
void operator()(booster::system::error_code const &e)
{
if(complete_response) {
self->do_eof();
conn->do_eof();
}
h(e ? cppcms::http::context::operation_aborted : cppcms::http::context::operation_completed );
if(!self->cached_async_write_binder_) {
self->cached_async_write_binder_ = this;
if(!conn->cached_async_write_binder_) {
conn->cached_async_write_binder_ = this;
reset();
}
}
};

booster::intrusive_ptr<connection::async_write_binder> connection::get_write_binder(ehandler const &h,bool complete_response)
{
booster::intrusive_ptr<connection::async_write_binder> tmp;
if(cached_async_write_binder_) {
tmp.swap(cached_async_write_binder_);
}
if(!tmp) {
tmp = new connection::async_write_binder();
}
tmp->conn = self();
tmp->h = h;
tmp->complete_response = complete_response;
return tmp;
}

void connection::async_write_response( http::response &response,
bool complete_response,
ehandler const &h)
{
booster::intrusive_ptr<async_write_binder> tmp = get_write_binder(h,complete_response);
booster::system::error_code e;
http::response::chunk_type chunk = response.get_async_chunk();
if(chunk.second > 0) {
booster::intrusive_ptr<async_write_binder> binder;
binder.swap(cached_async_write_binder_);
if(!binder)
binder = new async_write_binder();

binder->init(self(),complete_response,h,chunk.first);
booster::intrusive_ptr<booster::callable<void(booster::system::error_code const &,size_t)> > p(binder.get());
async_write( booster::aio::buffer(&(*chunk.first)[0],chunk.second),p,complete_response);
if(chunk.second > 0 || complete_response || has_pending()) {
booster::aio::const_buffer out;
if(chunk.second > 0)
out = booster::aio::buffer(&chunk.first->front(),chunk.second);
if(nonblocking_write(out,complete_response,e) || e) {
get_io_service().post(tmp,e);
return;
}
async_write(booster::aio::const_buffer(),false,tmp);
return;
}
// request to send an empty block
get_io_service().post(boost::bind(h,http::context::operation_completed));
get_io_service().post(tmp,e);
}

bool connection::has_pending()
{
return !pending_output_.empty();
}

void connection::append_pending(booster::aio::const_buffer const &new_data)
{
size_t pos = pending_output_.size();
pending_output_.resize(pending_output_.size() + new_data.bytes_count());
std::pair<booster::aio::const_buffer::entry const *,size_t> packets = new_data.get();
for(size_t i=0;i<packets.second;i++) {
memcpy(&pending_output_[pos],packets.first[i].ptr,packets.first[i].size);
pos += packets.first[i].size;
}
}

bool connection::write(booster::aio::const_buffer const &buf,bool eof,booster::system::error_code &e)
{
booster::aio::const_buffer new_data = format_output(buf,eof,e);
if(e) return false;
booster::aio::const_buffer output = pending_output_.empty() ? new_data : (booster::aio::buffer(pending_output_) + new_data);
socket().set_non_blocking_if_needed(false,e);
if(e) return false;

bool r=write_to_socket(output,e);
pending_output_.clear();
return r;
}

bool connection::write_to_socket(booster::aio::const_buffer const &in,booster::system::error_code &e)
{
return socket().write(in,e) == in.bytes_count();
}

bool connection::nonblocking_write(booster::aio::const_buffer const &buf,bool eof,booster::system::error_code &e)
{
booster::aio::const_buffer new_data = format_output(buf,eof,e);
if(e) return false;
booster::aio::const_buffer output = pending_output_.empty() ? new_data : (booster::aio::buffer(pending_output_) + new_data);
socket().set_non_blocking_if_needed(true,e);
if(e) return false;

size_t n = socket().write_some(output,e);
if(n == output.bytes_count()) {
pending_output_.clear();
return true;
}
if(n == 0) {
append_pending(new_data);
}
else {
std::vector<char> tmp;
pending_output_.swap(tmp);
// after swapping output still points to a valid buffer
append_pending(output + n);
}
if(e && socket().would_block(e)) {
e=booster::system::error_code();
return false;
}
return false;
}

struct connection::async_write_handler : public booster::callable<void(booster::system::error_code const &e)>
{
typedef booster::shared_ptr<cppcms::impl::cgi::connection> conn_type;
typedef booster::intrusive_ptr< booster::callable<void(booster::system::error_code const &)> > self_type;
std::vector<char> data;
booster::aio::const_buffer output;
handler h;
conn_type conn;

async_write_handler(conn_type const &c,std::vector<char> &d,handler const &hin) :
h(hin),
conn(c)
{
data.swap(d);
output = booster::aio::buffer(data);
}

virtual void operator()(booster::system::error_code const &ein)
{
if(ein) { h(ein); return; }
booster::system::error_code e;
conn->socket().set_non_blocking_if_needed(true,e);
size_t n = conn->socket().write_some(output,e);
if(n!=0)
conn->on_some_output_written();
output += n;
if(output.empty()) {
h(e);
return;
}
if(e && booster::aio::basic_io_device::would_block(e)) {
conn->socket().on_writeable(self_type(this));
}
h(e);
}
};

void connection::async_write(booster::aio::const_buffer const &buf,bool eof,handler const &h)
{
booster::system::error_code e;
if(nonblocking_write(buf,eof,e) || e) {
if(!e) on_some_output_written();
get_io_service().post(h,e);
return;
}
async_write_handler::self_type p(new async_write_handler(self(),pending_output_,h));
socket().on_writeable(p);
}




+ 2
- 59
src/fastcgi_api.cpp View File

@@ -144,26 +144,6 @@ namespace cgi {
h(booster::system::error_code(),s);
}
public:
static void print(booster::aio::const_buffer const &in,bool eof)
{
std::pair<booster::aio::const_buffer::entry const *,size_t> r = in.get();
for(size_t i=0;i<r.second;i++) {
std::cout.write(r.first[i].ptr,r.first[i].size);
}
std::cout << (eof ? "EOF" : "---")<< std::endl;
}
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool eof)
{
print(in,eof);
booster::system::error_code dummy;
do_write(in,h,true,dummy,eof);
}
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool eof)
{
print(in,eof);
return do_write(in,io_handler(),false,e,eof);
}

virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool completed,booster::system::error_code &)
{
@@ -226,45 +206,8 @@ namespace cgi {
}
return packet;
}
virtual size_t do_write(booster::aio::const_buffer const &in,io_handler const &h,bool async,booster::system::error_code &e,bool eof)
{
size_t s = in.bytes_count();
io::const_buffer packet = format_output(in,eof,e);

if(e)
return 0;
if(async) {
if(packet.empty()) {
socket_.get_io_service().post(h,booster::system::error_code(),0);
return 0;
}
socket_.async_write(
packet,
boost::bind( h,
_1,
s));
return s;
}
else {
if(packet.empty())
return 0;
booster::system::error_code err;
size_t res = socket_.write(packet,err);
if(err && io::basic_socket::would_block(err)) {
socket_.set_non_blocking(false);
packet+=res;
socket_.write(packet,e);
return s;
}
else if(err) {
e=err;
return 0;
}
else
return s;
}
}
virtual void on_some_output_written() {}
virtual booster::aio::stream_socket &socket() { return socket_; }
virtual booster::aio::io_service &get_io_service()
{
return socket_.get_io_service();


+ 10
- 24
src/http_api.cpp View File

@@ -316,27 +316,6 @@ namespace cgi {
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
}
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool eof)
{
update_time();
watchdog_->add(self());
booster::system::error_code e;
booster::aio::const_buffer packet = format_output(in,eof,e);
if(e) {
socket_.get_io_service().post(h,e,0);
return;
}
socket_.async_write(packet,h);
}
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool eof)
{

booster::aio::const_buffer packet = format_output(in,eof,e);
if(e) return 0;
write_to_socket(packet,e);
if(e) return 0;
return in.bytes_count();
}
#ifndef CPPCMS_NO_SO_SNDTIMO
size_t timed_write_some(booster::aio::const_buffer const &buf,booster::system::error_code &e)
{
@@ -392,7 +371,7 @@ namespace cgi {
return 0;
}
#endif
size_t write_to_socket(booster::aio::const_buffer const &bufin,booster::system::error_code &e)
bool write_to_socket(booster::aio::const_buffer const &bufin,booster::system::error_code &e)
{
booster::aio::const_buffer buf = bufin;
size_t total = 0;
@@ -405,12 +384,12 @@ namespace cgi {
break;
}
}
return total;
return total == bufin.bytes_count();
}
void set_sync_options(booster::system::error_code &e)
{
if(!sync_option_is_set_) {
socket_.set_non_blocking(false,e);
socket_.set_non_blocking_if_needed(false,e);
if(e)
return;
cppcms::impl::set_send_timeout(socket_,timeout_,e);
@@ -442,8 +421,15 @@ namespace cgi {
socket_.async_read_some(io::buffer(&a,1),boost::bind(h));
}

void on_some_output_written()
{
update_time();
}
virtual booster::aio::stream_socket &socket() { return socket_; }
virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool /*completed*/,booster::system::error_code &e)
{
update_time();
add_to_watchdog();
if(headers_done_)
return in;
booster::aio::const_buffer packet;


+ 22
- 16
src/http_response.cpp View File

@@ -54,7 +54,13 @@ namespace details {
return ss.str();
}

class copy_buf : public std::streambuf {
class closeable {
public:
virtual void close() = 0;
virtual ~closeable() {}
};

class copy_buf : public std::streambuf, public closeable {
public:
copy_buf(std::streambuf *output = 0) :
out_(output)
@@ -176,7 +182,7 @@ namespace details {


#ifndef CPPCMS_NO_GZIP
class gzip_buf : public basic_obuf<gzip_buf> {
class gzip_buf : public basic_obuf<gzip_buf>, public closeable {
public:
gzip_buf(size_t n = 0) :
basic_obuf<gzip_buf>(n),
@@ -281,7 +287,7 @@ namespace details {

#endif

class output_device : public basic_obuf<output_device> {
class output_device : public basic_obuf<output_device>, public closeable {
booster::weak_ptr<impl::cgi::connection> conn_;
bool final_;
bool eof_send_;
@@ -313,13 +319,13 @@ namespace details {
booster::system::error_code e;

int res = c->write(in,e,set_eof);
bool res = c->write(in,set_eof,e);
if(e) {
BOOSTER_WARNING("cppcms") << "Failed to write response:" << e.message();
conn_.reset();
return -1;
}
if(res!=int(in.bytes_count()))
if(!res)
return -1;
if(set_eof)
eof_send_ = true;
@@ -337,6 +343,7 @@ struct response::_data {
typedef std::map<std::string,std::string,compare_type> headers_type;
headers_type headers;
std::list<std::string> added_headers;
std::list<details::closeable *> buffers;

details::copy_buf buffered;
details::copy_buf cached;
@@ -423,15 +430,8 @@ void response::finalize()
{
if(!finalized_) {
out();
#ifndef CPPCMS_NO_GZIP
d->zbuf.close();
#endif
d->cached.close();

if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw)
d->buffered.close();
else
d->output_buf.close();
for(std::list<details::closeable *>::iterator p=d->buffers.begin();p!=d->buffers.end();++p)
(*p)->close();
finalized_=1;
}
}
@@ -546,10 +546,14 @@ std::ostream &response::out()
if(finalized_)
throw cppcms_error("Request for output stream for finalized request is illegal");
if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw)
if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw) {
d->output.rdbuf(&d->buffered);
else
d->buffers.push_front(&d->buffered);
}
else {
d->output.rdbuf(&d->output_buf);
d->buffers.push_front(&d->output_buf);
}
ostream_requested_=1;
@@ -568,6 +572,7 @@ std::ostream &response::out()
if(copy_to_cache_) {
d->cached.open(d->output.rdbuf());
d->output.rdbuf(&d->cached);
d->buffers.push_front(&d->cached);
}
#ifndef CPPCMS_NO_GZIP
@@ -576,6 +581,7 @@ std::ostream &response::out()
int buffer=context_.service().cached_settings().gzip.buffer;
d->zbuf.open(d->output.rdbuf(),level,buffer);
d->output.rdbuf(&d->zbuf);
d->buffers.push_front(&d->zbuf);
}
#endif


+ 2
- 19
src/scgi_api.cpp View File

@@ -122,25 +122,6 @@ namespace cgi {
{
socket_.async_read_some(io::buffer(p,s),h);
}
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool /*eof*/)
{
socket_.async_write(in,h);
}
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool /*eof*/)
{
booster::system::error_code err;
size_t res = socket_.write(in,err);
if(err && io::basic_socket::would_block(err)) {
socket_.set_non_blocking(false);
return socket_.write_some(in,e);
}
else if(err) {
e=err;
return res;
}
else
return res;
}
virtual booster::aio::io_service &get_io_service()
{
return socket_.get_io_service();
@@ -163,6 +144,8 @@ namespace cgi {
socket_.async_read_some(io::buffer(&a,1),boost::bind(h));
}

virtual void on_some_output_written() {}
virtual booster::aio::stream_socket &socket() { return socket_; }
private:
size_t start_,end_,sep_;
booster::shared_ptr<scgi> self()


+ 6
- 5
tests/dummy_api.h View File

@@ -38,18 +38,19 @@ public:
throw std::runtime_error("dummy_api: unsupported");
}

void async_write(booster::aio::const_buffer const &,io_handler const &,bool)
{
throw std::runtime_error("dummy_api: unsupported");
}
virtual void do_eof(){}
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &,bool )
virtual void on_some_output_written() {}
virtual bool write(booster::aio::const_buffer const &in,bool,booster::system::error_code &)
{
std::pair<booster::aio::const_buffer::entry const *,size_t> all=in.get();
for(size_t i=0;i<all.second;i++)
output_->append(reinterpret_cast<char const *>(all.first[i].ptr),all.first[i].size);
return in.bytes_count();
}
virtual booster::aio::stream_socket &socket()
{
throw std::runtime_error("dummy_api: unsupported");
}
virtual booster::aio::io_service &get_io_service()
{
throw std::runtime_error("dummy_api: unsupported");


Loading…
Cancel
Save