Browse Source

Implemented preparation for non-blocking API

master
Artyom Beilis 8 years ago
parent
commit
cea1147147
8 changed files with 204 additions and 211 deletions
  1. +5
    -11
      private/cgi_api.h
  2. +19
    -53
      src/cgi_api.cpp
  3. +70
    -58
      src/fastcgi_api.cpp
  4. +60
    -59
      src/http_api.cpp
  5. +1
    -2
      src/http_context.cpp
  6. +28
    -7
      src/http_response.cpp
  7. +10
    -12
      src/scgi_api.cpp
  8. +11
    -9
      tests/dummy_api.h

+ 5
- 11
private/cgi_api.h View File

@@ -11,6 +11,7 @@
#include <booster/noncopyable.h>
#include <booster/shared_ptr.h>
#include <booster/enable_shared_from_this.h>
#include <booster/aio/buffer.h>
#include <vector>
#include <map>
#include <booster/callback.h>
@@ -76,10 +77,6 @@ namespace cgi {
void async_write_response( http::response &response,
bool complete_response,
ehandler const &on_response_written);

void async_complete_response( ehandler const &on_response_complete);

void complete_response();
void aync_wait_for_close_by_peer(callback const &on_eof);

@@ -114,8 +111,10 @@ namespace cgi {
// These are abstract member function that should be implemented by
// actual protocol like FCGI, SCGI, HTTP or CGI
public:
virtual void async_write(void const *,size_t,io_handler const &h) = 0;
virtual size_t write(void const *,size_t,booster::system::error_code &e) = 0;
virtual void async_write(booster::aio::const_buffer const &buf,io_handler const &h,bool eof) = 0;
virtual size_t write(booster::aio::const_buffer const &buf,booster::system::error_code &e,bool eof) = 0;
virtual void do_eof() = 0;
virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool completed,booster::system::error_code &e) = 0;
protected:


@@ -127,8 +126,6 @@ namespace cgi {
virtual void async_read_some(void *,size_t,io_handler const &h) = 0;
virtual void on_async_read_complete() {}
virtual void async_read_eof(callback const &h) = 0;
virtual void async_write_eof(handler const &h) = 0;
virtual void write_eof() = 0;
virtual booster::aio::io_service &get_io_service() = 0;

/****************************************************************************/
@@ -156,12 +153,9 @@ namespace cgi {
void load_content(booster::system::error_code const &e,http::context *,ehandler const &h);
void on_post_data_loaded(booster::system::error_code const &e,http::context *,ehandler const &h);
void on_some_multipart_read(booster::system::error_code const &e,size_t n,http::context *,ehandler const &h);
void on_async_write_written(booster::system::error_code const &e,bool complete_response,ehandler const &h);
void on_eof_written(booster::system::error_code const &e,ehandler const &h);
void handle_eof(callback const &on_eof);
void handle_http_error(int code,http::context *context,ehandler const &h);
void handle_http_error_eof(booster::system::error_code const &e,size_t n,int code,ehandler const &h);
void handle_http_error_done(booster::system::error_code const &e,int code,ehandler const &h);

std::vector<char> content_;
cppcms::service *service_;


+ 19
- 53
src/cgi_api.cpp View File

@@ -121,11 +121,15 @@ namespace cppcms { namespace impl { namespace cgi {
void on_response_read(booster::system::error_code const &e,size_t len)
{
if(e) {
conn_->async_write_eof(boost::bind(&cgi_forwarder::cleanup,shared_from_this()));
conn_->async_write(booster::aio::const_buffer(),
boost::bind(&cgi_forwarder::cleanup,shared_from_this()),
true);
return;
}
else {
conn_->async_write(&response_.front(),len,boost::bind(&cgi_forwarder::on_response_written,shared_from_this(),_1,_2));
conn_->async_write(booster::aio::buffer(&response_.front(),len),
boost::bind(&cgi_forwarder::on_response_written,shared_from_this(),_1,_2),
false);
}
}
void on_response_written(booster::system::error_code const &e,size_t /*len*/)
@@ -259,14 +263,15 @@ void connection::handle_http_error(int code,http::context *context,ehandler cons
async_chunk_ += "</h1>\r\n"
"</body>\r\n"
"</html>\r\n";
async_write(async_chunk_.c_str(),async_chunk_.size(),
async_write(booster::aio::buffer(async_chunk_),
boost::bind(
&connection::handle_http_error_eof,
self(),
_1,
_2,
code,
h));
h),
true);
}

void connection::handle_http_error_eof(
@@ -279,18 +284,12 @@ void connection::handle_http_error_eof(
set_error(h,e.message());
return;
}
async_write_eof(boost::bind(&connection::handle_http_error_done,self(),_1,code,h));
}

void connection::handle_http_error_done(booster::system::error_code const &e,int code,ehandler const &h)
{
if(e) {
set_error(h,e.message());
return;
}
do_eof();
set_error(h,http::response::status_to_string(code));
}



void connection::load_content(booster::system::error_code const &e,http::context *context,ehandler const &h)
{
if(e) {
@@ -452,7 +451,10 @@ struct connection::async_write_binder : public booster::callable<void(booster::s
}
void operator()(booster::system::error_code const &e,size_t)
{
self->on_async_write_written(e,complete_response,h);
if(complete_response) {
self->do_eof();
}
h(e ? cppcms::http::context::operation_aborted : cppcms::http::context::operation_completed );
if(!self->cached_async_write_binder_) {
self->cached_async_write_binder_ = this;
reset();
@@ -474,49 +476,13 @@ void connection::async_write_response( http::response &response,

binder->init(self(),complete_response,h,chunk.first);
booster::intrusive_ptr<booster::callable<void(booster::system::error_code const &,size_t)> > p(binder.get());
async_write( &(*chunk.first)[0],
chunk.second,
p);
return;
}
if(!complete_response) {
// request to send an empty block
service().impl().get_io_service().post(boost::bind(h,http::context::operation_completed));
return;
}
on_async_write_written(booster::system::error_code(),true,h); // < h will not be called when complete_response = true
}

void connection::on_async_write_written(booster::system::error_code const &e,bool complete_response,ehandler const &h)
{
if(e) {
BOOSTER_WARNING("cppcms") << "Writing response failed:" << e.message();
service().impl().get_io_service().post(boost::bind(h,http::context::operation_aborted));
return;
}
if(complete_response) {
async_write_eof(boost::bind(&connection::on_eof_written,self(),_1,h));
request_in_progress_=false;
async_write( booster::aio::buffer(&(*chunk.first)[0],chunk.second),p,complete_response);
return;
}
h(http::context::operation_completed);
}
void connection::async_complete_response(ehandler const &h)
{
async_write_eof(boost::bind(&connection::on_eof_written,self(),_1,h));
request_in_progress_=false;
// request to send an empty block
get_io_service().post(boost::bind(h,http::context::operation_completed));
}

void connection::complete_response()
{
write_eof();
}

void connection::on_eof_written(booster::system::error_code const &e,ehandler const &h)
{
if(e) { set_error(h,e.message()); return; }
h(http::context::operation_completed);
}


struct connection::reader {


+ 70
- 58
src/fastcgi_api.cpp View File

@@ -143,64 +143,102 @@ namespace cgi {
}
h(booster::system::error_code(),s);
}
public:
virtual void async_write(void const *p,size_t s,io_handler const &h)
public:
static void print(booster::aio::const_buffer const &in,bool eof)
{
std::pair<booster::aio::const_buffer::entry const *,size_t> r = in.get();
for(size_t i=0;i<r.second;i++) {
std::cout.write(r.first[i].ptr,r.first[i].size);
}
std::cout << (eof ? "EOF" : "---")<< std::endl;
}
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool eof)
{
print(in,eof);
booster::system::error_code dummy;
do_write(p,s,h,true,dummy);
do_write(in,h,true,dummy,eof);
}
virtual size_t write(void const *buffer,size_t n,booster::system::error_code &e)
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool eof)
{
return do_write(buffer,n,io_handler(),false,e);
print(in,eof);
return do_write(in,io_handler(),false,e,eof);
}
virtual size_t do_write(void const *p,size_t s,io_handler const &h,bool async,booster::system::error_code &e)
virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool completed,booster::system::error_code &)
{
if(s==0) {
if(async)
socket_.get_io_service().post(h,booster::system::error_code(),0);
return 0;
}
io::const_buffer packet;
size_t reminder = s;
char const *ptr = static_cast<char const *>(p);
booster::aio::const_buffer packet;
booster::aio::const_buffer::entry const *chunks = in.get().first;
size_t reminder = in.bytes_count();
size_t in_size = reminder;
size_t chunk_consumed = 0;
while(reminder > 0) {
static char pad[8];
static const char pad[8]={0,0,0,0,0,0,0,0};
static const size_t max_packet_len = 65535;
size_t chunk = 0;
int pad_len = 0;
if(reminder > max_packet_len) {
if(s > max_packet_len && reminder == s) {
chunk = max_packet_len;
if(in_size > max_packet_len && reminder == in_size) {
// prepare only once
full_header_.version = fcgi_version_1;
full_header_.type=fcgi_stdout;
full_header_.request_id=request_id_;
full_header_.content_length = max_packet_len;
full_header_.padding_length = 1;
full_header_.padding_length = pad_len = 1;
full_header_.to_net();
}
packet += io::buffer(&full_header_,sizeof(full_header_));
packet += io::buffer(ptr,max_packet_len);
packet += io::buffer(pad,1);
ptr += max_packet_len;
reminder -= max_packet_len;
}
else {
else {
chunk = reminder;
memset(&header_,0,sizeof(header_));
header_.version=fcgi_version_1;
header_.type=fcgi_stdout;
header_.request_id=request_id_;
header_.content_length = reminder;
header_.padding_length =(8 - (reminder % 8)) % 8;
packet += io::buffer(&header_,sizeof(header_));
packet += io::buffer(ptr,reminder);
packet += io::buffer(pad,header_.padding_length);
header_.padding_length =pad_len = (8 - (reminder % 8)) % 8;
header_.to_net();
ptr += reminder;
reminder = 0;

packet += io::buffer(&header_,sizeof(header_));
}

reminder -= chunk;
while(chunk > 0) {
size_t next_size = chunks->size - chunk_consumed;
if(next_size > chunk)
next_size = chunk;

packet += io::buffer(chunks->ptr + chunk_consumed, next_size);
chunk_consumed += next_size;
chunk -= next_size;
if(chunk_consumed == chunks->size) {
chunks++;
chunk_consumed = 0;
}
}

packet += io::buffer(pad,pad_len);
}
if(completed) {
prepare_eof();
packet += io::buffer(&eof_,sizeof(eof_));
}
return packet;
}
virtual size_t do_write(booster::aio::const_buffer const &in,io_handler const &h,bool async,booster::system::error_code &e,bool eof)
{
size_t s = in.bytes_count();
io::const_buffer packet = format_output(in,eof,e);

if(e)
return 0;
if(async) {
if(packet.empty()) {
socket_.get_io_service().post(h,booster::system::error_code(),0);
return 0;
}
socket_.async_write(
packet,
boost::bind( h,
@@ -209,6 +247,8 @@ namespace cgi {
return s;
}
else {
if(packet.empty())
return 0;
booster::system::error_code err;
size_t res = socket_.write(packet,err);
if(err && io::basic_socket::would_block(err)) {
@@ -251,36 +291,8 @@ namespace cgi {
eof_.headers_[1].to_net();
eof_.record_.to_net();
}
virtual void write_eof()
{
prepare_eof();
booster::system::error_code e;
socket_.write(io::buffer(&eof_,sizeof(eof_)),e);
}
virtual void async_write_eof(handler const &h)
{
prepare_eof();
socket_.cancel();
socket_.async_write(
io::buffer(&eof_,sizeof(eof_)),
boost::bind( &fastcgi::on_written_eof,
self(),
_1,
h));
}

virtual void on_written_eof(booster::system::error_code const &e,handler const &h)
virtual void do_eof()
{
if(e) { h(e); return; }

// Stop reading from socket
if(!keep_alive_) {
booster::system::error_code err;
socket_.shutdown(io::stream_socket::shut_rdwr,err);
}

h(booster::system::error_code());
}

// This is not really correct because server may try


+ 60
- 59
src/http_api.cpp View File

@@ -310,33 +310,32 @@ namespace cgi {
}
socket_.async_read_some(io::buffer(p,s),h);
}
virtual void async_write_eof(handler const &h)
{
remove_from_watchdog();
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.get_io_service().post(h,booster::system::error_code());
}
virtual void write_eof()
virtual void do_eof()
{
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
}
virtual void async_write(void const *p,size_t s,io_handler const &h)
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool eof)
{
update_time();
watchdog_->add(self());
if(headers_done_)
socket_.async_write(io::buffer(p,s),h);
else
process_output_headers(p,s,h);
booster::system::error_code e;
booster::aio::const_buffer packet = format_output(in,eof,e);
if(e) {
socket_.get_io_service().post(h,e,0);
return;
}
socket_.async_write(packet,h);
}
virtual size_t write(void const *p,size_t n,booster::system::error_code &e)
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool eof)
{
if(headers_done_)
return write_to_socket(io::buffer(p,n),e);
return process_output_headers(p,n);

booster::aio::const_buffer packet = format_output(in,eof,e);
if(e) return 0;
write_to_socket(packet,e);
if(e) return 0;
return in.bytes_count();
}
#ifndef CPPCMS_NO_SO_SNDTIMO
size_t timed_write_some(booster::aio::const_buffer const &buf,booster::system::error_code &e)
@@ -443,9 +442,43 @@ namespace cgi {
socket_.async_read_some(io::buffer(&a,1),boost::bind(h));
}

virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool /*completed*/,booster::system::error_code &e)
{
if(headers_done_)
return in;
booster::aio::const_buffer packet;
std::pair<booster::aio::const_buffer::entry const *,size_t> tmp = in.get();
booster::aio::const_buffer::entry const *entry = tmp.first;
size_t parts = tmp.second;
while(parts > 0) {
bool r =process_output_headers(entry->ptr,entry->size,e);
parts--;
entry++;
if(r) {
if(!e)
headers_done_ = true;
break;
}
}
if(e)
return packet;
packet+= booster::aio::buffer(response_line_);
packet+= booster::aio::buffer(output_body_);
while(parts > 0) {
packet+= booster::aio::buffer(entry->ptr,entry->size);
parts --;
entry++;
}
return packet;
}

private:
size_t process_output_headers(void const *p,size_t s,io_handler const &h=io_handler())
bool process_output_headers(void const *p,size_t s,booster::system::error_code &e)
{
static char const *addon =
"Server: CppCMS-Embedded/" CPPCMS_PACKAGE_VERSION "\r\n"
"Connection: close\r\n";
char const *ptr=static_cast<char const *>(p);
output_body_.insert(output_body_.end(),ptr,ptr+s);

@@ -454,68 +487,36 @@ namespace cgi {
for(;;) {
switch(output_parser_.step()) {
case parser::more_data:
if(h)
h(booster::system::error_code(),s);
return s;
return false;
case parser::got_header:
{
char const *name="";
char const *value="";
if(!parse_single_header(output_parser_.header_,name,value)) {
h(booster::system::error_code(errc::protocol_violation,cppcms_category),s);
return s;
e=booster::system::error_code(errc::protocol_violation,cppcms_category);
return true;
}
if(strcmp(name,"STATUS")==0) {
response_line_ = "HTTP/1.0 ";
response_line_ +=value;
response_line_ +="\r\n";
return write_response(h,s);
response_line_.append(addon);
return true;
}
}
break;
case parser::end_of_headers:
response_line_ = "HTTP/1.0 200 Ok\r\n";
response_line_.append(addon);
return true;

return write_response(h,s);
case parser::error_observerd:
h(booster::system::error_code(errc::protocol_violation,cppcms_category),0);
return 0;
e=booster::system::error_code(errc::protocol_violation,cppcms_category);
return true;
}
}
}
size_t write_response(io_handler const &h,size_t s)
{
char const *addon =
"Server: CppCMS-Embedded/" CPPCMS_PACKAGE_VERSION "\r\n"
"Connection: close\r\n";

response_line_.append(addon);

booster::aio::const_buffer packet =
io::buffer(response_line_)
+ io::buffer(output_body_);
#ifdef DEBUG_HTTP_PARSER
std::cerr<<"["<<response_line_<<std::string(output_body_.begin(),output_body_.end())
<<"]"<<std::endl;
#endif
headers_done_=true;
if(!h) {
booster::system::error_code e;
write_to_socket(packet,e);
if(e) return 0;
return s;
}

socket_.async_write(packet,boost::bind(&http::do_write,self(),_1,h,s));
return s;
}

void do_write(booster::system::error_code const &e,io_handler const &h,size_t s)
{
if(e) { h(e,0); return; }
h(booster::system::error_code(),s);
}

void process_request(handler const &h)
{


+ 1
- 2
src/http_context.cpp View File

@@ -116,7 +116,6 @@ void context::on_request_ready(bool error)
void context::complete_response()
{
response().finalize();
conn_->complete_response();
if(conn_->is_reuseable()) {
booster::shared_ptr<context> cont(new context(conn_));
service().post(boost::bind(&context::run,cont));
@@ -191,7 +190,7 @@ void context::async_complete_response()
boost::bind(&context::try_restart,self(),_1));
return;
}
conn_->async_complete_response(boost::bind(&context::try_restart,self(),_1));
complete_response();
}

void context::try_restart(bool e)


+ 28
- 7
src/http_response.cpp View File

@@ -218,6 +218,7 @@ namespace details {
{
if(!opened_)
return;
pubsync();
if(out_) {
z_stream_.avail_in = 0;
z_stream_.next_in = 0;
@@ -282,36 +283,52 @@ namespace details {

class output_device : public basic_obuf<output_device> {
booster::weak_ptr<impl::cgi::connection> conn_;
bool final_;
bool eof_send_;
public:
output_device(impl::cgi::connection *conn,size_t n) :
basic_obuf<output_device>(n),
conn_(conn->shared_from_this())
conn_(conn->shared_from_this()),
final_(false),
eof_send_(false)
{
}
void close()
{
final_=true;
pubsync();
if(!eof_send_) {
do_write(booster::aio::const_buffer());
}
}
int write(char const *data,int n)
int do_write(booster::aio::const_buffer const &in)
{
if(n==0)
bool set_eof = final_ && !eof_send_;
if(in.empty() && !set_eof)
return 0;

booster::shared_ptr<impl::cgi::connection> c = conn_.lock();
if(!c)
return -1;
booster::system::error_code e;

int res = c->write(data,n,e);
int res = c->write(in,e,set_eof);
if(e) {
BOOSTER_WARNING("cppcms") << "Failed to write response:" << e.message();
conn_.reset();
return -1;
}
if(res!=n)
if(res!=int(in.bytes_count()))
return -1;
if(set_eof)
eof_send_ = true;
return 0;
}
int write(char const *data,int n)
{
return do_write(booster::aio::buffer(data,n));
}
};
}

@@ -405,12 +422,16 @@ void response::set_header(std::string const &name,std::string const &value)
void response::finalize()
{
if(!finalized_) {
out()<<std::flush;
out();
#ifndef CPPCMS_NO_GZIP
d->zbuf.close();
#endif
d->cached.close();
d->output_buf.close();

if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw)
d->buffered.close();
else
d->output_buf.close();
finalized_=1;
}
}


+ 10
- 12
src/scgi_api.cpp View File

@@ -113,22 +113,26 @@ namespace cgi {

h(booster::system::error_code());
}
virtual booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool /*comleted*/,booster::system::error_code &/*e*/)
{
return in;
}

virtual void async_read_some(void *p,size_t s,io_handler const &h)
{
socket_.async_read_some(io::buffer(p,s),h);
}
virtual void async_write(void const *p,size_t s,io_handler const &h)
virtual void async_write(booster::aio::const_buffer const &in,io_handler const &h,bool /*eof*/)
{
socket_.async_write(io::buffer(p,s),h);
socket_.async_write(in,h);
}
virtual size_t write(void const *buffer,size_t n,booster::system::error_code &e)
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &e,bool /*eof*/)
{
booster::system::error_code err;
size_t res = socket_.write(io::buffer(buffer,n),err);
size_t res = socket_.write(in,err);
if(err && io::basic_socket::would_block(err)) {
socket_.set_non_blocking(false);
return socket_.write_some(io::buffer(buffer,n),e);
return socket_.write_some(in,e);
}
else if(err) {
e=err;
@@ -146,18 +150,12 @@ namespace cgi {
return false;
}

virtual void write_eof()
virtual void do_eof()
{
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
}
virtual void async_write_eof(handler const &h)
{
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.get_io_service().post(boost::bind(h,booster::system::error_code()));
}

virtual void async_read_eof(callback const &h)
{


+ 11
- 9
tests/dummy_api.h View File

@@ -24,6 +24,10 @@ public:
env_.add(pool_.add(p->first),pool_.add(p->second));
}

booster::aio::const_buffer format_output(booster::aio::const_buffer const &in,bool,booster::system::error_code &)
{
return in;
}
void async_read_headers(handler const &)
{
throw std::runtime_error("dummy_api: unsupported");
@@ -34,15 +38,17 @@ public:
throw std::runtime_error("dummy_api: unsupported");
}

void async_write(void const *,size_t,io_handler const &)
void async_write(booster::aio::const_buffer const &,io_handler const &,bool)
{
throw std::runtime_error("dummy_api: unsupported");
}
virtual void write_eof(){}
virtual size_t write(void const *p,size_t s,booster::system::error_code &)
virtual void do_eof(){}
virtual size_t write(booster::aio::const_buffer const &in,booster::system::error_code &,bool )
{
output_->append(reinterpret_cast<char const *>(p),s);
return s;
std::pair<booster::aio::const_buffer::entry const *,size_t> all=in.get();
for(size_t i=0;i<all.second;i++)
output_->append(reinterpret_cast<char const *>(all.first[i].ptr),all.first[i].size);
return in.bytes_count();
}
virtual booster::aio::io_service &get_io_service()
{
@@ -57,10 +63,6 @@ public:
{
throw std::runtime_error("dummy_api: unsupported");
}
virtual void async_write_eof(handler const &)
{
throw std::runtime_error("dummy_api: unsupported");
}
private:
std::string *output_;



Loading…
Cancel
Save