Browse Source

Code reorganization, major performance improvements

master
Artyom Beilis 11 years ago
parent
commit
c85e5e0a5c
14 changed files with 350 additions and 198 deletions
  1. +15
    -0
      booster/booster/aio/buffer.h
  2. +10
    -0
      booster/booster/aio/stream_socket.h
  3. +54
    -4
      booster/lib/aio/src/stream_socket.cpp
  4. +13
    -0
      booster/lib/aio/test/test_socket.cpp
  5. +3
    -1
      cppcms/http_response.h
  6. +11
    -7
      private/cgi_api.h
  7. +1
    -1
      private/http_parser.h
  8. +56
    -69
      src/cgi_api.cpp
  9. +44
    -21
      src/fastcgi_api.cpp
  10. +95
    -76
      src/http_api.cpp
  11. +6
    -3
      src/http_context.cpp
  12. +36
    -10
      src/http_response.cpp
  13. +4
    -4
      src/scgi_api.cpp
  14. +2
    -2
      tests/dummy_api.h

+ 15
- 0
booster/booster/aio/buffer.h View File

@@ -85,6 +85,21 @@ namespace booster {
{
return size_ == 0;
}
size_t size()
{
return size_;
}
size_t bytes_count() const
{
if(size_ == 0)
return 0;
if(size_ == 1)
return entry_.size;
size_t n = 0;
for(size_t i=0;i<vec_.size();i++)
n+=vec_[i].size;
return n;
}
private:
int size_;
entry entry_;


+ 10
- 0
booster/booster/aio/stream_socket.h View File

@@ -193,6 +193,16 @@ namespace aio {
///
void async_write(const_buffer const &buffer,io_handler const &h);

///
/// Return a number of avalible bytes to read, if error occurs returns 0 and e set to the error code
///
size_t bytes_readable(booster::system::error_code &e);

///
/// Return a number of avalible bytes to read, if error occurs system_error is thrown
///
size_t bytes_readable();

private:
int readv(mutable_buffer const &b);
int writev(const_buffer const &b);


+ 54
- 4
booster/lib/aio/src/stream_socket.cpp View File

@@ -9,6 +9,10 @@
#include "socket_details.h"
#include <booster/aio/socket.h>

#ifndef BOOSTER_WIN32
#include <sys/ioctl.h>
#endif

//#define BOOSTER_AIO_FORCE_POLL


@@ -407,9 +411,9 @@ namespace {
struct writer_all : public callable<void(system::error_code const &e)>
{
typedef intrusive_ptr<writer_all> pointer;
writer_all(stream_socket *s,const_buffer const &b,io_handler const &handler) :
buf(b),
count(0),
writer_all(stream_socket *s,const_buffer const &b,size_t n,io_handler const &handler) :
buf(b + n),
count(n),
self(s),
h(handler)
{
@@ -533,10 +537,56 @@ void stream_socket::async_write(const_buffer const &buffer,io_handler const &h)
{
if(!dont_block(h))
return;
writer_all::pointer r(new writer_all(this,buffer,h));

#ifdef BOOSTER_AIO_FORCE_POLL
writer_all::pointer r(new writer_all(this,buffer,0,h));
r->run();
#else
system::error_code e;
size_t n = write_some(buffer,e);
if((!e && n!=buffer.bytes_count()) || (e && would_block(e))) {
writer_all::pointer r(new writer_all(this,buffer,n,h));
r->run();
}
else {
get_io_service().post(h,e,n);
}
#endif




}

size_t stream_socket::bytes_readable(booster::system::error_code &e)
{
#ifdef BOOSTER_WIN32
unsigned long size = 0;
int r = ::ioctlsocket(native(),FIONREAD,&size);
if(r != 0) {
e=geterror();
return 0;
}
return size;
#else
int size = 0;
int r = ::ioctl(native(),FIONREAD,&size);
if(r < 0) {
e=geterror();
return 0;
}
return size;
#endif
}

size_t stream_socket::bytes_readable()
{
booster::system::error_code e;
size_t r = bytes_readable(e);
if(e)
throw booster::system::system_error(e);
return r;
}

void socket_pair(stream_socket &s1,stream_socket &s2,system::error_code &e)
{


+ 13
- 0
booster/lib/aio/test/test_socket.cpp View File

@@ -59,17 +59,27 @@ void test_buffer()
char const *s3="foobar";
char const *s4="bee";
TEST(b.empty());
TEST(b.size()==0);
TEST(b.bytes_count() == 0);
b.add(s2,0);
TEST(b.empty());
TEST(b.size() == 0);
TEST(b.bytes_count() == 0);
b.add(s1,4);
TEST(b.size() == 1);
TEST(b.bytes_count() == 4);
TEST(!b.empty());
TEST(b.get().second==1);
TEST(b.get().first[0].ptr==s1 && b.get().first[0].size==4);
b.add(s3,6);
TEST(b.size() == 2);
TEST(b.bytes_count() == 10);
TEST(b.get().second==2);
TEST(b.get().first[0].ptr==s1 && b.get().first[0].size==4);
TEST(b.get().first[1].ptr==s3 && b.get().first[1].size==6);
b.add(s4,3);
TEST(b.size() == 3);
TEST(b.bytes_count() == 13);
TEST(b.get().second==3);
TEST(b.get().first[0].ptr==s1 && b.get().first[0].size==4);
TEST(b.get().first[1].ptr==s3 && b.get().first[1].size==6);
@@ -152,10 +162,13 @@ void basic_io()
std::string str1="hello";
io::stream_socket s1,s2;
make_pair(s1,s2);
TEST(s2.bytes_readable()==0);
TEST(s1.write_some(booster::aio::buffer(str1))==5);
TEST(s2.bytes_readable()==5);
char buf[16] = {0};
TEST(s2.read_some(booster::aio::buffer(buf,sizeof(buf)))==5);
TEST(str1==buf);
TEST(s2.bytes_readable()==0);
str1="x";
TEST(s2.write_some(io::buffer(str1))==1);
TEST(s1.read_some(io::buffer(buf,sizeof(buf)))==1 && buf[0]=='x');


+ 3
- 1
cppcms/http_response.h View File

@@ -316,7 +316,9 @@ namespace http {
std::pair<char const *,size_t> output();

void write_http_headers(std::ostream &);
std::string get_async_chunk();

typedef std::pair<booster::shared_ptr<std::vector<char> >,size_t> chunk_type;
chunk_type get_async_chunk();

struct _data;
booster::hold_ptr<_data> d;


+ 11
- 7
private/cgi_api.h View File

@@ -15,6 +15,7 @@
#include <map>
#include <booster/callback.h>
#include <booster/system_error.h>
#include <cppcms/http_context.h>

#include <cppcms/defs.h>
#include <cppcms/config.h>
@@ -46,7 +47,7 @@ namespace cgi {
typedef booster::callback<void(booster::system::error_code const &e)> handler;
typedef booster::callback<void(booster::system::error_code const &e,size_t)> io_handler;
typedef booster::callback<void()> callback;
typedef booster::callback<void(bool)> ehandler;
typedef cppcms::http::context::handler ehandler;

class connection;
class acceptor : public booster::noncopyable {
@@ -103,17 +104,20 @@ namespace cgi {
}
return map_env_;
}
size_t write(void const *data,size_t n,booster::system::error_code &e);
bool is_reuseable();
std::string last_error();
protected:

/****************************************************************************/

// These are abstract member function that should be implemented by
// actual protocol like FCGI, SCGI, HTTP or CGI
public:
virtual void async_write(void const *,size_t,io_handler const &h) = 0;
virtual size_t write(void const *,size_t,booster::system::error_code &e) = 0;
protected:


virtual void async_read_headers(handler const &h) = 0;
virtual bool keep_alive() = 0;
@@ -123,10 +127,8 @@ namespace cgi {
virtual void async_read_some(void *,size_t,io_handler const &h) = 0;
virtual void on_async_read_complete() {}
virtual void async_read_eof(callback const &h) = 0;
virtual void async_write_some(void const *,size_t,io_handler const &h) = 0;
virtual void async_write_eof(handler const &h) = 0;
virtual void write_eof() = 0;
virtual size_t write_some(void const *,size_t,booster::system::error_code &e) = 0;
virtual booster::aio::io_service &get_io_service() = 0;

/****************************************************************************/
@@ -138,15 +140,15 @@ namespace cgi {

booster::shared_ptr<connection> self();
void async_read(void *,size_t,io_handler const &h);
void async_write(void const *,size_t,io_handler const &h);
private:

struct reader;
struct writer;
struct cgi_forwarder;
struct async_write_binder;

friend struct reader;
friend struct writer;
friend struct async_write_binder;
friend struct cgi_forwarder;

void set_error(ehandler const &h,std::string s);
@@ -171,6 +173,8 @@ namespace cgi {

std::map<std::string,std::string> map_env_;

booster::intrusive_ptr<async_write_binder> cached_async_write_binder_;

};




+ 1
- 1
private/http_parser.h View File

@@ -71,7 +71,7 @@ public:
body_(body),
body_ptr_(body_ptr)
{
header_.reserve(512);
header_.reserve(32);
}
void reset()
{


+ 56
- 69
src/cgi_api.cpp View File

@@ -179,7 +179,7 @@ booster::shared_ptr<connection> connection::self()
}

void connection::async_prepare_request( http::context *context,
booster::callback<void(bool)> const &h)
ehandler const &h)
{
async_read_headers(boost::bind(&connection::on_headers_read,self(),_1,context,h));
}
@@ -198,7 +198,7 @@ void connection::on_headers_read(booster::system::error_code const &e,http::cont
if(addr.second != 0 && !addr.first.empty()) {
booster::shared_ptr<cgi_forwarder> f(new cgi_forwarder(self(),addr.first,addr.second));
f->async_run();
h(true);
h(http::context::operation_aborted);
return;
}
context->request().prepare();
@@ -220,7 +220,7 @@ void connection::handle_eof(callback const &on_eof)
void connection::set_error(ehandler const &h,std::string s)
{
error_=s;
h(true);
h(http::context::operation_aborted);
}

void connection::handle_http_error(int code,http::context *context,ehandler const &h)
@@ -384,7 +384,7 @@ void connection::on_some_multipart_read(booster::system::error_code const &e,siz
}
context->request().set_post_data(files);
multipart_parser_.reset();
h(false);
h(http::context::operation_completed);
return;
}
else if (r==multipart_parser::parsing_error) {
@@ -416,7 +416,7 @@ void connection::on_post_data_loaded(booster::system::error_code const &e,http::
if(e) { set_error(h,e.message()); return; }
context->request().set_post_data(content_);
on_async_read_complete();
h(false);
h(http::context::operation_completed);
}

bool connection::is_reuseable()
@@ -429,33 +429,69 @@ std::string connection::last_error()
return error_;
}

struct connection::async_write_binder : public booster::callable<void(booster::system::error_code const &,size_t)> {
typedef booster::shared_ptr<cppcms::impl::cgi::connection> self_type;
self_type self;
ehandler h;
bool complete_response;
booster::shared_ptr<std::vector<char> > block;
async_write_binder() :complete_response(false) {}
void init(self_type c,bool comp,ehandler const &hnd,booster::shared_ptr<std::vector<char> > const &b)
{
self=c;
complete_response = comp;
h = hnd;
block = b;
}
void reset()
{
h=ehandler();
self.reset();
complete_response = false;
block.reset();
}
void operator()(booster::system::error_code const &e,size_t)
{
self->on_async_write_written(e,complete_response,h);
if(!self->cached_async_write_binder_) {
self->cached_async_write_binder_ = this;
reset();
}
}
};


void connection::async_write_response( http::response &response,
bool complete_response,
ehandler const &h)
{
async_chunk_=response.get_async_chunk();
if(!async_chunk_.empty()) {
async_write( async_chunk_.data(),
async_chunk_.size(),
boost::bind( &connection::on_async_write_written,
self(),
_1,
complete_response,
h));
http::response::chunk_type chunk = response.get_async_chunk();
if(chunk.second > 0) {
booster::intrusive_ptr<async_write_binder> binder;
binder.swap(cached_async_write_binder_);
if(!binder)
binder = new async_write_binder();

binder->init(self(),complete_response,h,chunk.first);
booster::intrusive_ptr<booster::callable<void(booster::system::error_code const &,size_t)> > p(binder.get());
async_write( &(*chunk.first)[0],
chunk.second,
p);
return;
}
if(complete_response) {
on_async_write_written(booster::system::error_code(),complete_response,h);
if(!complete_response) {
// request to send an empty block
service().impl().get_io_service().post(boost::bind(h,http::context::operation_completed));
return;
}
service().impl().get_io_service().post(boost::bind(h,false));
on_async_write_written(booster::system::error_code(),true,h); // < h will not be called when complete_response = true
}

void connection::on_async_write_written(booster::system::error_code const &e,bool complete_response,ehandler const &h)
{
if(e) {
BOOSTER_WARNING("cppcms") << "Writing response failed:" << e.message();
service().impl().get_io_service().post(boost::bind(h,true));
service().impl().get_io_service().post(boost::bind(h,http::context::operation_aborted));
return;
}
if(complete_response) {
@@ -463,7 +499,7 @@ void connection::on_async_write_written(booster::system::error_code const &e,boo
request_in_progress_=false;
return;
}
service().impl().get_io_service().post(boost::bind(h,false));
h(http::context::operation_completed);
}
void connection::async_complete_response(ehandler const &h)
{
@@ -479,7 +515,7 @@ void connection::complete_response()
void connection::on_eof_written(booster::system::error_code const &e,ehandler const &h)
{
if(e) { set_error(h,e.message()); return; }
h(false);
h(http::context::operation_completed);
}


@@ -508,31 +544,6 @@ struct connection::reader {
conn->async_read_some(p,s,*this);
}
};
struct connection::writer {
writer(connection *C,io_handler const &H,size_t S,char const *P) : h(H), s(S), p(P),conn(C)
{
done=0;
}
io_handler h;
size_t s;
size_t done;
char const *p;
connection *conn;
void operator() (booster::system::error_code const &e=booster::system::error_code(),size_t wr = 0)
{
if(e) {
h(e,done+wr);
return;
}
s-=wr;
p+=wr;
done+=wr;
if(s==0)
h(booster::system::error_code(),done);
else
conn->async_write_some(p,s,*this);
}
};

void connection::async_read(void *p,size_t s,io_handler const &h)
{
@@ -540,30 +551,6 @@ void connection::async_read(void *p,size_t s,io_handler const &h)
r();
}

void connection::async_write(void const *p,size_t s,io_handler const &h)
{
writer w(this,h,s,(char const *)p);
w();
}

size_t connection::write(void const *data,size_t n,booster::system::error_code &e)
{
char const *p=reinterpret_cast<char const *>(data);
size_t wr=0;
while(n > 0) {
size_t d=write_some(p,n,e);
if(d == 0)
return wr;
p+=d;
wr+=d;
n-=d;
if(e)
return wr;
}
return wr;
}


} // cgi
} // impl
} // cppcms

+ 44
- 21
src/fastcgi_api.cpp View File

@@ -144,40 +144,62 @@ namespace cgi {
h(booster::system::error_code(),s);
}
public:
virtual void async_write_some(void const *p,size_t s,io_handler const &h)
virtual void async_write(void const *p,size_t s,io_handler const &h)
{
booster::system::error_code dummy;
do_write_some(p,s,h,true,dummy);
do_write(p,s,h,true,dummy);
}
virtual size_t write_some(void const *buffer,size_t n,booster::system::error_code &e)
virtual size_t write(void const *buffer,size_t n,booster::system::error_code &e)
{
return do_write_some(buffer,n,io_handler(),false,e);
return do_write(buffer,n,io_handler(),false,e);
}
virtual size_t do_write_some(void const *p,size_t s,io_handler const &h,bool async,booster::system::error_code &e)
virtual size_t do_write(void const *p,size_t s,io_handler const &h,bool async,booster::system::error_code &e)
{
if(s==0) {
if(async)
socket_.get_io_service().post(boost::bind(h,booster::system::error_code(),0));
socket_.get_io_service().post(h,booster::system::error_code(),0);
return 0;
}
memset(&header_,0,sizeof(header_));
header_.version=fcgi_version_1;
header_.type=fcgi_stdout;
header_.request_id=request_id_;
if(s > 65535) s=65535;
header_.content_length =s;
header_.padding_length =(8 - (s % 8)) % 8;
static char pad[8];
io::const_buffer packet;
size_t reminder = s;
char const *ptr = static_cast<char const *>(p);
while(reminder > 0) {
static char pad[8];
static const size_t max_packet_len = 65535;
if(reminder > max_packet_len) {
if(s > max_packet_len && reminder == s) {
// prepare only once
full_header_.version = fcgi_version_1;
full_header_.type=fcgi_stdout;
full_header_.request_id=request_id_;
full_header_.content_length = max_packet_len;
full_header_.padding_length = 1;
full_header_.to_net();
}
packet += io::buffer(&full_header_,sizeof(full_header_));
packet += io::buffer(ptr,max_packet_len);
packet += io::buffer(pad,1);
ptr += max_packet_len;
reminder -= max_packet_len;
}
else {
memset(&header_,0,sizeof(header_));
header_.version=fcgi_version_1;
header_.type=fcgi_stdout;
header_.request_id=request_id_;
header_.content_length = reminder;
header_.padding_length =(8 - (reminder % 8)) % 8;
packet += io::buffer(&header_,sizeof(header_));
packet += io::buffer(ptr,reminder);
packet += io::buffer(pad,header_.padding_length);
header_.to_net();
ptr += reminder;
reminder = 0;
}
}
header_.to_net();

io::const_buffer packet =
io::buffer(&header_,sizeof(header_))
+ io::buffer(p,s)
+ io::buffer(pad,header_.padding_length);

if(async) {
socket_.async_write(
packet,
@@ -726,6 +748,7 @@ namespace cgi {
io::stream_socket socket_;

fcgi_header header_;
fcgi_header full_header_;
std::vector<char> body_;




+ 95
- 76
src/http_api.cpp View File

@@ -132,6 +132,7 @@ namespace cgi {
time_to_die_(0),
timeout_(0),
sync_option_is_set_(false),
in_watchdog_(false),
watchdog_(wd),
rewrite_(rw)
{
@@ -153,20 +154,6 @@ namespace cgi {
socket_.shutdown(io::stream_socket::shut_rdwr,e);
}
}
struct binder {
void operator()(booster::system::error_code const &e,size_t n) const
{
self_->some_headers_data_read(e,n,h_);
}
binder(booster::shared_ptr<http> self,handler const &h) :
self_(self),
h_(h)
{
}
private:
booster::shared_ptr<http> self_;
handler h_;
};

void update_time()
{
@@ -200,23 +187,33 @@ namespace cgi {

void async_read_some_headers(handler const &h)
{

input_body_.reserve(8192);
input_body_.resize(8192,0);
input_body_ptr_=0;
socket_.async_read_some(io::buffer(input_body_),binder(self(),h));
socket_.on_readable(boost::bind(&http::some_headers_data_read,self(),_1,h));
update_time();
}
virtual void async_read_headers(handler const &h)
{
update_time();
watchdog_->add(self());
add_to_watchdog();
async_read_some_headers(h);
}

void some_headers_data_read(booster::system::error_code const &e,size_t n,handler const &h)
void some_headers_data_read(booster::system::error_code const &er,handler const &h)
{
if(e) { h(e); return; }
if(er) { h(er); return; }

booster::system::error_code e;
size_t n = socket_.bytes_readable(e);
if(e) { h(e); return ; }

if(n > 16384)
n=16384;
if(input_body_.capacity() < n) {
input_body_.reserve(n);
}
input_body_.resize(input_body_.capacity(),0);
input_body_ptr_=0;
n = socket_.read_some(booster::aio::buffer(input_body_),e);

total_read_+=n;
if(total_read_ > 16384) {
@@ -304,14 +301,18 @@ namespace cgi {
socket_.get_io_service().post(boost::bind(h,booster::system::error_code(),s));
return;
}
if(input_body_.capacity()!=0) {
std::vector<char> v;
input_body_.swap(v);
}
socket_.async_read_some(io::buffer(p,s),h);
}
virtual void async_write_eof(handler const &h)
{
watchdog_->remove(self());
remove_from_watchdog();
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.get_io_service().post(boost::bind(h,booster::system::error_code()));
socket_.get_io_service().post(h,booster::system::error_code());
}
virtual void write_eof()
{
@@ -319,29 +320,29 @@ namespace cgi {
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
}
virtual void async_write_some(void const *p,size_t s,io_handler const &h)
virtual void async_write(void const *p,size_t s,io_handler const &h)
{
update_time();
watchdog_->add(self());
if(headers_done_)
socket_.async_write_some(io::buffer(p,s),h);
socket_.async_write(io::buffer(p,s),h);
else
process_output_headers(p,s,h);
}
virtual size_t write_some(void const *p,size_t n,booster::system::error_code &e)
virtual size_t write(void const *p,size_t n,booster::system::error_code &e)
{
if(headers_done_)
return write_some_to_socket(io::buffer(p,n),e);
return write_to_socket(io::buffer(p,n),e);
return process_output_headers(p,n);
}
#ifndef CPPCMS_NO_SO_SNDTIMO
// using SO_SNDTIMO on almost all platforms
size_t write_some_to_socket(booster::aio::const_buffer const &buf,booster::system::error_code &e)
size_t write_to_socket(booster::aio::const_buffer const &buf,booster::system::error_code &e)
{
set_sync_options(e);
if(e) return 0;
booster::ptime start = booster::ptime::now();
size_t n = socket_.write_some(buf,e);
size_t n = socket_.write(buf,e);
booster::ptime end = booster::ptime::now();
// it may actually return with success but return small
// a small buffer
@@ -363,38 +364,45 @@ namespace cgi {
}
#else
// we need to fallback to poll, fortunatelly only on some Unixes like Solaris, so we can use poll(2)
size_t write_some_to_socket(booster::aio::const_buffer const &buf,booster::system::error_code &e)
{
size_t n = socket_.write_some(buf,e);
if(!e || !io::basic_socket::would_block(e))
return n;
booster::ptime start = booster::ptime::now();

pollfd pfd=pollfd();
pfd.fd = socket_.native();
pfd.events = POLLOUT;
int msec = timeout_ * 1000;
int msec_total = msec;
for(;;) {
int r = poll(&pfd,1,msec);

// handle restart after EINTR
if(r < 0) {
if(errno == EINTR) {
int passed = int(booster::ptime::to_number(booster::ptime::now() - start)*1000);
msec = msec_total - passed;
if(msec < 0)
msec = 0;
continue;
}
e = booster::system::error_code(errno,booster::system::system_category);
return 0;
size_t write_to_socket(booster::aio::const_buffer const &bufin,booster::system::error_code &e)
{
booster::aio::const_buffer buf = bufin;
size_t total = 0;
while(!buf.empty()) {
size_t n = socket_.write_some(buf,e);
if(!e || !io::basic_socket::would_block(e)) {
buf += n
total += n;
continue;
}
else if(r == 0) {
// timeout :-(
e=booster::system::error_code(errc::protocol_violation,cppcms_category);
die();
return 0;
booster::ptime start = booster::ptime::now();

pollfd pfd=pollfd();
pfd.fd = socket_.native();
pfd.events = POLLOUT;
int msec = timeout_ * 1000;
int msec_total = msec;
for(;;) {
int r = poll(&pfd,1,msec);

// handle restart after EINTR
if(r < 0) {
if(errno == EINTR) {
int passed = int(booster::ptime::to_number(booster::ptime::now() - start)*1000);
msec = msec_total - passed;
if(msec < 0)
msec = 0;
continue;
}
e = booster::system::error_code(errno,booster::system::system_category);
return total;
}
else if(r == 0) {
// timeout :-(
e=booster::system::error_code(errc::protocol_violation,cppcms_category);
die();
return total;
}
}
// check if we can write
if(pfd.revents & POLLOUT) {
@@ -403,23 +411,18 @@ namespace cgi {
// restart polling if we get would_block again
if(n == 0 && io::basic_socket::would_block(e))
continue;
return n;
buf += n;
total += n;
}
e=booster::system::error_code(booster::aio::aio_error::select_failed,booster::aio::aio_error_cat);
return 0;
}
else {
e=booster::system::error_code(
booster::aio::aio_error::select_failed,
booster::aio::aio_error_cat);
return total;
}
} // while
}
#endif
size_t write_to_socket(booster::aio::const_buffer buf,booster::system::error_code &e)
{
size_t res = 0;
while(!buf.empty() && !e) {
size_t n = write_some_to_socket(buf,e);
buf += n;
res += n;
}
return res;
}
void set_sync_options(booster::system::error_code &e)
{
if(!sync_option_is_set_) {
@@ -611,7 +614,7 @@ namespace cgi {
}
void on_async_read_complete()
{
watchdog_->remove(self());
remove_from_watchdog();
}
void error_response(char const *message,handler const &h)
{
@@ -687,6 +690,22 @@ namespace cgi {
time_t time_to_die_;
int timeout_;
bool sync_option_is_set_;
bool in_watchdog_;

void add_to_watchdog()
{
if(!in_watchdog_) {
watchdog_->add(self());
in_watchdog_ = true;
}
}
void remove_from_watchdog()
{
if(in_watchdog_) {
watchdog_->remove(self());
in_watchdog_ = false;
}
}

booster::shared_ptr<http_watchdog> watchdog_;
booster::shared_ptr<url_rewriter> rewrite_;


+ 6
- 3
src/http_context.cpp View File

@@ -57,8 +57,6 @@ context::context(booster::shared_ptr<impl::cgi::connection> conn) :
d.reset(new _data(*this));
d->response.reset(new http::response(*this));
skin(service().views_pool().default_skin());
d->cache.reset(new cache_interface(*this));
d->session.reset(new session_interface(*this));
}

std::string context::skin()
@@ -68,6 +66,9 @@ std::string context::skin()

cache_interface &context::cache()
{
if(!d->cache.get()) {
d->cache.reset(new cache_interface(*this));
}
return *d->cache;
}

@@ -177,7 +178,7 @@ void context::async_flush_output(context::handler const &h)
conn_->async_write_response(
response(),
false,
boost::bind(wrapper,h,_1));
h);
}

void context::async_complete_response()
@@ -259,6 +260,8 @@ void context::locale(std::string const &name)
}
session_interface &context::session()
{
if(!d->session.get())
d->session.reset(new session_interface(*this));
return *d->session;
}



+ 36
- 10
src/http_response.cpp View File

@@ -63,12 +63,27 @@ namespace details {
{
out_ = out;
}
size_t getstr(booster::shared_ptr<std::vector<char> > &buf)
{
size_t n = buffer_.size() - (epptr() - pptr());
setp(0,0);
if(!borrowed_buffer_) {
borrowed_buffer_.reset(new std::vector<char>());
}
borrowed_buffer_->swap(buffer_);
buf = borrowed_buffer_;
return n;
}
void getstr(std::string &out)
{
buffer_.resize(buffer_.size() - (epptr() - pptr()));
size_t n = buffer_.size() - (epptr() - pptr());
setp(0,0);
buffer_.swap(out);
buffer_.clear();
if(n!=0) {
out.assign(&buffer_[0],n);
}
else {
out.clear();
}
}
int overflow(int c)
{
@@ -81,7 +96,14 @@ namespace details {
r=-1;
}
if(pptr() == 0) {
buffer_.resize(1024);
if(buffer_.empty()) {
if(borrowed_buffer_ && borrowed_buffer_.unique() && borrowed_buffer_->size()!=0) {
buffer_.swap(*borrowed_buffer_);
}
else {
buffer_.resize(128);
}
}
setp(&buffer_[0],&buffer_[0]+buffer_.size());
}
else if(pptr()==epptr()) {
@@ -106,7 +128,8 @@ namespace details {
out_ = 0;
}
private:
std::string buffer_;
booster::shared_ptr<std::vector<char> > borrowed_buffer_;
std::vector<char> buffer_;
std::streambuf *out_;
};

@@ -118,7 +141,7 @@ namespace details {
{
if(n==0)
n=1024;
buf_.resize(n);
expected_size_ = n;
}
Self &self()
{
@@ -132,6 +155,8 @@ namespace details {
r = EOF;
}
}
if(buf_.empty())
buf_.resize(expected_size_);
char *begin = &buf_[0];
char *end = begin + buf_.size();
setp(begin,end);
@@ -144,6 +169,7 @@ namespace details {
return overflow(EOF);
}
private:
size_t expected_size_;
std::vector<char> buf_;
};

@@ -523,11 +549,11 @@ std::ostream &response::out()
return d->output;
}

std::string response::get_async_chunk()
response::chunk_type response::get_async_chunk()
{
std::string result;
d->buffered.getstr(result);
return result;
chunk_type c;
c.second = d->buffered.getstr(c.first);
return c;
}

bool response::some_output_was_written()


+ 4
- 4
src/scgi_api.cpp View File

@@ -118,14 +118,14 @@ namespace cgi {
{
socket_.async_read_some(io::buffer(p,s),h);
}
virtual void async_write_some(void const *p,size_t s,io_handler const &h)
virtual void async_write(void const *p,size_t s,io_handler const &h)
{
socket_.async_write_some(io::buffer(p,s),h);
socket_.async_write(io::buffer(p,s),h);
}
virtual size_t write_some(void const *buffer,size_t n,booster::system::error_code &e)
virtual size_t write(void const *buffer,size_t n,booster::system::error_code &e)
{
booster::system::error_code err;
size_t res = socket_.write_some(io::buffer(buffer,n),err);
size_t res = socket_.write(io::buffer(buffer,n),err);
if(err && io::basic_socket::would_block(err)) {
socket_.set_non_blocking(false);
return socket_.write_some(io::buffer(buffer,n),e);


+ 2
- 2
tests/dummy_api.h View File

@@ -34,12 +34,12 @@ public:
throw std::runtime_error("dummy_api: unsupported");
}

void async_write_some(void const *,size_t,io_handler const &)
void async_write(void const *,size_t,io_handler const &)
{
throw std::runtime_error("dummy_api: unsupported");
}
virtual void write_eof(){}
virtual size_t write_some(void const *p,size_t s,booster::system::error_code &)
virtual size_t write(void const *p,size_t s,booster::system::error_code &)
{
output_->append(reinterpret_cast<char const *>(p),s);
return s;


Loading…
Cancel
Save