Browse Source

Added improved output buffering

master
Artyom Beilis 8 years ago
parent
commit
6cf5526a85
8 changed files with 276 additions and 68 deletions
  1. +4
    -3
      booster/lib/aio/src/basic_socket.cpp
  2. +6
    -2
      cppcms/http_response.h
  3. +2
    -0
      private/cached_settings.h
  4. +4
    -12
      src/cgi_api.cpp
  5. +24
    -1
      src/fastcgi_api.cpp
  6. +7
    -3
      src/http_api.cpp
  7. +221
    -46
      src/http_response.cpp
  8. +8
    -1
      src/scgi_api.cpp

+ 4
- 3
booster/lib/aio/src/basic_socket.cpp View File

@@ -88,10 +88,11 @@ endpoint basic_socket::remote_endpoint(system::error_code &e)
std::vector<char> endpoint_raw_(1000,0);
sockaddr *sa = reinterpret_cast<sockaddr *>(&endpoint_raw_.front());
socklen_t len = endpoint_raw_.size();
if(::getpeername(native(),sa,&len) < 0)
e=geterror();
endpoint ep;
ep.raw(sa,len);
if(::getpeername(native(),sa,&len) < 0)
e=geterror();
else
ep.raw(sa,len);
return ep;
}



+ 6
- 2
cppcms/http_response.h View File

@@ -16,6 +16,11 @@
#include <iostream>
#include <cppcms/cstdint.h>

namespace booster{
namespace system {
class error_code;
}
}
namespace cppcms {
class cache_interface;
namespace impl { namespace cgi { class connection; }}
@@ -323,8 +328,7 @@ namespace http {

void write_http_headers(std::ostream &);

typedef std::pair<booster::shared_ptr<std::vector<char> >,size_t> chunk_type;
chunk_type get_async_chunk();
int flush_async_chunk(booster::system::error_code &e);

struct _data;
booster::hold_ptr<_data> d;


+ 2
- 0
private/cached_settings.h View File

@@ -55,6 +55,7 @@ namespace impl {
std::string ip;
int port;
int output_buffer_size;
int async_output_buffer_size;
bool disable_xpowered_by;
bool generate_http_headers;
int worker_threads;
@@ -64,6 +65,7 @@ namespace impl {
ip = v.get("service.ip","127.0.0.1");
port = v.get("service.port",8080);
output_buffer_size = v.get("service.output_buffer_size",16384);
async_output_buffer_size = v.get("service.async_output_buffer_size",1024);
disable_xpowered_by = v.get("service.disable_xpowered_by",false);
unsigned cpus = booster::thread::hardware_concurrency();
if(cpus == 0)


+ 4
- 12
src/cgi_api.cpp View File

@@ -137,6 +137,7 @@ namespace cppcms { namespace impl { namespace cgi {

void cleanup()
{
conn_->do_eof();
booster::system::error_code e;
scgi_.shutdown(booster::aio::stream_socket::shut_rdwr,e);
scgi_.close(e);
@@ -468,20 +469,11 @@ void connection::async_write_response( http::response &response,
{
booster::intrusive_ptr<async_write_binder> tmp = get_write_binder(h,complete_response);
booster::system::error_code e;
http::response::chunk_type chunk = response.get_async_chunk();
if(chunk.second > 0 || complete_response || has_pending()) {
booster::aio::const_buffer out;
if(chunk.second > 0)
out = booster::aio::buffer(&chunk.first->front(),chunk.second);
if(nonblocking_write(out,complete_response,e) || e) {
get_io_service().post(tmp,e);
return;
}
async_write(booster::aio::const_buffer(),false,tmp);
if(response.flush_async_chunk(e)!=0 || !has_pending()) {
get_io_service().post(tmp,e);
return;
}
// request to send an empty block
get_io_service().post(tmp,e);
async_write(booster::aio::const_buffer(),false,tmp);
}

bool connection::has_pending()


+ 24
- 1
src/fastcgi_api.cpp View File

@@ -17,6 +17,7 @@
#include <string.h>
#include <algorithm>
#include <iostream>
#include <stdio.h>
#include <cppcms/config.h>

#include <booster/aio/buffer.h>
@@ -49,11 +50,13 @@ namespace cgi {


template<typename API,typename Factory> class socket_acceptor;

class fastcgi : public connection {
public:
fastcgi(cppcms::service &srv) :
connection(srv),
socket_(srv.impl().get_io_service())
socket_(srv.impl().get_io_service()),
eof_callback_(false)
{
reset_all();
int procs=srv.procs_no();
@@ -149,6 +152,21 @@ namespace cgi {
{
booster::aio::const_buffer packet;
booster::aio::const_buffer::entry const *chunks = in.get().first;
#ifdef DEBUG_FASTCGI
{
size_t n=in.get().second;
printf("Format output of %d:\n",int(in.bytes_count()));
for(size_t i=0;i<n;i++) {
printf("[%.*s]",int(chunks[i].size),chunks[i].ptr);
}
if(completed) {
printf("\nEOF\n");
}
else {
printf("\n---\n");
}
}
#endif
size_t reminder = in.bytes_count();
size_t in_size = reminder;
size_t chunk_consumed = 0;
@@ -236,12 +254,16 @@ namespace cgi {
}
virtual void do_eof()
{
if(eof_callback_)
socket_.cancel();
eof_callback_ = false;
}

// This is not really correct because server may try
// to multiplex or ask control... But meanwhile it is good enough
virtual void async_read_eof(callback const &h)
{
eof_callback_ = true;
static char a;
async_read_from_socket(&a,1,boost::bind(h));
}
@@ -721,6 +743,7 @@ namespace cgi {

std::vector<char> cache_;
size_t cache_start_,cache_end_;
bool eof_callback_;

void reset_all()
{


+ 7
- 3
src/http_api.cpp View File

@@ -137,7 +137,8 @@ namespace cgi {
sync_option_is_set_(false),
in_watchdog_(false),
watchdog_(wd),
rewrite_(rw)
rewrite_(rw),
eof_callback_(false)
{

env_.add("SERVER_SOFTWARE",CPPCMS_PACKAGE_NAME "/" CPPCMS_PACKAGE_VERSION);
@@ -168,8 +169,7 @@ namespace cgi {
char const *uri = request_uri_;
if(!uri || *uri==0)
uri = "unknown";
booster::system::error_code e;
BOOSTER_INFO("cppcms_http") << "Timeout on connection for URI: " << uri << " from " << socket_.remote_endpoint(e).ip();
BOOSTER_INFO("cppcms_http") << "Timeout on connection for URI: " << uri << " from " << cgetenv("REMOTE_ADDR");
}

void die()
@@ -312,6 +312,9 @@ namespace cgi {
}
virtual void do_eof()
{
if(eof_callback_)
socket_.cancel();
eof_callback_ = false;
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
@@ -681,6 +684,7 @@ namespace cgi {

booster::shared_ptr<http_watchdog> watchdog_;
booster::shared_ptr<url_rewriter> rewrite_;
bool eof_callback_;
};
void http_watchdog::check(booster::system::error_code const &e)
{


+ 221
- 46
src/http_response.cpp View File

@@ -54,13 +54,14 @@ namespace details {
return ss.str();
}

class closeable {
class closeable : public std::streambuf {
public:
virtual void close() = 0;
virtual ~closeable() {}
};

class copy_buf : public std::streambuf, public closeable {

class copy_buf : public closeable {
public:
copy_buf(std::streambuf *output = 0) :
out_(output)
@@ -127,11 +128,15 @@ namespace details {
}
int sync()
{
return overflow(EOF);
if(overflow(EOF) < 0)
return -1;
if(out_)
return out_->pubsync();
return 0;
}
void close()
{
pubsync();
overflow(EOF);
out_ = 0;
}
private:
@@ -142,7 +147,7 @@ namespace details {


template<typename Self>
class basic_obuf : public std::streambuf {
class basic_obuf : public closeable {
public:
basic_obuf(size_t n = 0)
{
@@ -182,7 +187,7 @@ namespace details {


#ifndef CPPCMS_NO_GZIP
class gzip_buf : public basic_obuf<gzip_buf>, public closeable {
class gzip_buf : public basic_obuf<gzip_buf> {
public:
gzip_buf(size_t n = 0) :
basic_obuf<gzip_buf>(n),
@@ -286,56 +291,224 @@ namespace details {
};

#endif

class output_device : public basic_obuf<output_device>, public closeable {
booster::weak_ptr<impl::cgi::connection> conn_;
bool final_;
bool eof_send_;
class basic_device : public closeable {
public:
output_device(impl::cgi::connection *conn,size_t n) :
basic_obuf<output_device>(n),
conn_(conn->shared_from_this()),
basic_device() :
final_(false),
eof_send_(false)
eof_send_(false),
buffer_size_(0)
{
}
void close()
void open(booster::weak_ptr<impl::cgi::connection> c,size_t n)
{
final_=true;
pubsync();
if(!eof_send_) {
do_write(booster::aio::const_buffer());
}
buffer_size_ = n;
do_setp();
conn_ = c;
}
int do_write(booster::aio::const_buffer const &in)

virtual bool do_write(impl::cgi::connection &c,booster::aio::const_buffer const &out,bool eof,booster::system::error_code &e) = 0;

int write(booster::aio::const_buffer const &out)
{
booster::system::error_code e;
return write(out,e);
}
int write(booster::aio::const_buffer const &out,booster::system::error_code &e)
{
bool set_eof = final_ && !eof_send_;
if(in.empty() && !set_eof)
if(out.empty())
return 0;

booster::shared_ptr<impl::cgi::connection> c = conn_.lock();
if(!c)
return -1;
booster::system::error_code e;

bool res = c->write(in,set_eof,e);
bool send_eof = final_ && !eof_send_;
eof_send_ = send_eof;
if(do_write(*c,out,send_eof,e)) {
return 0;
}
if(e) {
BOOSTER_WARNING("cppcms") << "Failed to write response:" << e.message();
conn_.reset();
return -1;

}
if(!res)
return 0;
}
virtual int sync()
{
return overflow(EOF);
}
virtual int overflow(int c)
{
char c_tmp=c;
booster::aio::const_buffer out=booster::aio::buffer(pbase(),pptr()-pbase());
if(c!=EOF)
out += booster::aio::buffer(&c_tmp,1);
booster::system::error_code e;
if(write(out)!=0)
return -1;
if(set_eof)
eof_send_ = true;
do_setp();
return 0;
}
int write(char const *data,int n)
virtual std::streamsize xsputn(const char* s, std::streamsize n)
{
if((epptr() - pptr()) >= n) {
memcpy(pptr(),s,n);
pbump(n);
return n;
}
else {
booster::aio::const_buffer out=booster::aio::buffer(pbase(),pptr()-pbase());
out+=booster::aio::buffer(s,n);
if(write(out)!=0)
return -1;
do_setp();
return n;
}
}

void do_setp()
{
output_.resize(buffer_size_);
if(buffer_size_ == 0)
setp(0,0);
else
setp(&output_[0],&output_[buffer_size_-1]+1);
}
int flush(booster::system::error_code &e)
{
return do_write(booster::aio::buffer(data,n));
int r = write(booster::aio::buffer(pbase(),pptr()-pbase()),e);
setp(pbase(),epptr());
return r;
}
virtual std::streambuf *setbuf(char const *,std::streamsize size)
{
buffer_size_ = size;
std::streamsize content_size = pptr() - pbase();
if(content_size > size) {
booster::system::error_code e;
if(flush(e)!=0)
return 0;
content_size = 0;
}
do_setp();
pbump(content_size);
return this;
}
void close()
{
if(eof_send_)
return;
final_=true;
booster::system::error_code e;
flush(e);
}
protected:
booster::weak_ptr<impl::cgi::connection> conn_;
bool final_;
bool eof_send_;
size_t buffer_size_;
std::vector<char> output_;
};

class async_io_buf : public basic_device {
public:
async_io_buf() : full_buffering_(true)
{
}

int full_buffering(bool buffering)
{
if(full_buffering_ == buffering)
return 0;
full_buffering_ = buffering;
if(full_buffering_ == false) {
if(pubsetbuf(0,buffer_size_)==0) {
return -1;
}
}
return 0;
}
std::streambuf *setbuf(char *s,std::streamsize size)
{
if(full_buffering_) {
buffer_size_ = size;
std::streamsize content_size = pptr() - pbase();
if(size_t(size) > output_.size())
output_.resize(size);
do_setp();
pbump(content_size);
return this;
}
return basic_device::setbuf(s,size);
}
size_t next_size(size_t in)
{
if(in == 0)
return 64;
return in * 2;
}
virtual int overflow(int c)
{
if(full_buffering_) {
if(pptr() == epptr()) {
size_t current_size = pptr() - pbase();
output_.resize(next_size(output_.size()));
setp(&output_[0],&output_[output_.size()-1]+1);
pbump(current_size);
}
if(c!=EOF) {
*pptr() = c;
pbump(1);
}
return 0;
}
else {
return basic_device::overflow(c);
}
}
virtual std::streamsize xsputn(const char* s, std::streamsize n)
{
if(full_buffering_) {
std::streamsize reminder = epptr() - pptr();
if(reminder < n) {
size_t current_size = pptr()-pbase();
size_t minimal_size = current_size + n;
size_t resize_size = next_size(output_.size());
while(resize_size < minimal_size)
resize_size *= 2;
output_.resize(resize_size);
setp(&output_[0],&output_[0]+resize_size);
pbump(current_size);
}
memcpy(pptr(),s,n);
pbump(n);
return n;
}
else {
return basic_device::xsputn(s,n);
}
}
virtual bool do_write(impl::cgi::connection &c,booster::aio::const_buffer const &out,bool eof,booster::system::error_code &e)
{
c.nonblocking_write(out,eof,e);
if(e)
return false;
return true;
}
private:
bool full_buffering_;
};


class output_device : public basic_device {
public:
virtual bool do_write(impl::cgi::connection &c,booster::aio::const_buffer const &out,bool eof,booster::system::error_code &e)
{
return c.write(out,eof,e);
}
};


}

struct response::_data {
@@ -345,18 +518,19 @@ struct response::_data {
std::list<std::string> added_headers;
std::list<details::closeable *> buffers;

details::copy_buf buffered;
details::async_io_buf buffered;
details::copy_buf cached;
#ifndef CPPCMS_NO_GZIP
details::gzip_buf zbuf;
#endif
details::output_device output_buf;
std::ostream output;
booster::weak_ptr<impl::cgi::connection> conn;

_data(impl::cgi::connection *conn) :
_data(impl::cgi::connection *c) :
headers(details::string_i_comp),
output_buf(conn,conn->service().cached_settings().service.output_buffer_size),
output(0)
output(0),
conn(c->shared_from_this())
{
}
};
@@ -538,6 +712,12 @@ std::string response::copied_data()
return tmp;
}

int response::flush_async_chunk(booster::system::error_code &e)
{
return d->buffered.flush(e);
}


std::ostream &response::out()
{
if(ostream_requested_)
@@ -547,10 +727,12 @@ std::ostream &response::out()
throw cppcms_error("Request for output stream for finalized request is illegal");
if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw) {
d->buffered.open(d->conn,context_.service().cached_settings().service.async_output_buffer_size);
d->output.rdbuf(&d->buffered);
d->buffers.push_front(&d->buffered);
}
else {
d->output_buf.open(d->conn,context_.service().cached_settings().service.output_buffer_size);
d->output.rdbuf(&d->output_buf);
d->buffers.push_front(&d->output_buf);
}
@@ -589,13 +771,6 @@ std::ostream &response::out()
return d->output;
}

response::chunk_type response::get_async_chunk()
{
chunk_type c;
c.second = d->buffered.getstr(c.first);
return c;
}

bool response::some_output_was_written()
{
return ostream_requested_;


+ 8
- 1
src/scgi_api.cpp View File

@@ -13,6 +13,7 @@
#include "cppcms_error_category.h"
#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <cppcms/config.h>
#ifdef CPPCMS_USE_EXTERNAL_BOOST
# include <boost/bind.hpp>
@@ -35,7 +36,8 @@ namespace cgi {
connection(srv),
start_(0),
end_(0),
socket_(srv.impl().get_io_service())
socket_(srv.impl().get_io_service()),
eof_callback_(false)
{
}
~scgi()
@@ -133,6 +135,9 @@ namespace cgi {

virtual void do_eof()
{
if(eof_callback_)
socket_.cancel();
eof_callback_ = false;
booster::system::error_code e;
socket_.shutdown(io::stream_socket::shut_wr,e);
socket_.close(e);
@@ -140,6 +145,7 @@ namespace cgi {

virtual void async_read_eof(callback const &h)
{
eof_callback_ = true;
static char a;
socket_.async_read_some(io::buffer(&a,1),boost::bind(h));
}
@@ -155,6 +161,7 @@ namespace cgi {
friend class socket_acceptor<scgi>;
io::stream_socket socket_;
std::vector<char> buffer_;
bool eof_callback_;
};



Loading…
Cancel
Save