Browse Source

added setbuf API and blocking/nonblocking modes

gzip reorganization to support flush and improve buffering.
master
Artyom Beilis 8 years ago
parent
commit
b83f40dce3
7 changed files with 396 additions and 29 deletions
  1. +1
    -0
      CMakeLists.txt
  2. +41
    -0
      cppcms/http_response.h
  3. +161
    -23
      src/http_response.cpp
  4. +20
    -5
      tests/dummy_api.h
  5. +11
    -0
      tests/gzip_test.cpp
  6. +11
    -1
      tests/gzip_test.py
  7. +151
    -0
      tests/response_test.cpp

+ 1
- 0
CMakeLists.txt View File

@@ -678,6 +678,7 @@ set(ALL_TESTS
mount_point_test
todec_test
hash_map_test
response_test
)

if(NOT DISABLE_PREFORK_CACHE AND NOT IS_WINDOWS)


+ 41
- 0
cppcms/http_response.h View File

@@ -312,6 +312,47 @@ namespace http {
///
void finalize();

///
/// Defines the size of the output buffer for the output data.
///
/// Setting the to 0 makes the output non-buffered and prevents unnecessary copy of large objects in memory
///
/// The default is defined in configuration by service.output_buffer_size and service.async_output_buffer_size for synchronous and asynchronous
/// applications accordingly, can be called before and after out() was called
///
/// Calling with negative value would reset it to the default
///
/// Note: when gzip compression active, buffering is still present at intermediate levels
///
void setbuf(int buffer_size);
///
/// Sets full buffering mode for asynchronous applications
///
/// When set to true the output data is transferred only when async_flush_output or async_complete_response are called this
/// is the default behavior.
///
/// When set to false, the data is sent upon flushing of the std::ostream provided by out(). If I/O operation blocks,
/// the actual output is buffered, it is possible to test if blocking occurred by calling pending_blocked_output()
///
/// When the full buffering is disable, if an error occurs, the out() is set to fail state.
///
void full_asynchronous_buffering(bool enable);

///
/// Get current state of asynchronous buffering
///
bool full_asynchronous_buffering();

///
/// Returns true of some of the output wasn't sent due to blocking upon flushing of std::stream &out() which is used together with full_asynchronous_buffering(false) mode.
///
/// Note: it does not look on to internal buffers thus it is user's responsibility to call
///
/// Once the condition occurs you can flush the output asynchronously using async_complete_response or async_flush_output
///
bool pending_blocked_output();


/// \cond INTERNAL
response(context &);
~response();


+ 161
- 23
src/http_response.cpp View File

@@ -54,14 +54,24 @@ namespace details {
return ss.str();
}

class closeable : public std::streambuf {
class extended_streambuf : public std::streambuf {
public:
virtual std::pair<char *,char *> write_range()
{
return std::pair<char *,char *>(pptr(),epptr());
}
virtual int consume_range(int n) {
pbump(n);
if(pptr()==epptr())
return overflow(EOF);
return 0;
}
virtual void close() = 0;
virtual ~closeable() {}
virtual ~extended_streambuf() {}
};


class copy_buf : public closeable {
class copy_buf : public extended_streambuf {
public:
copy_buf(std::streambuf *output = 0) :
out_(output)
@@ -147,7 +157,7 @@ namespace details {


template<typename Self>
class basic_obuf : public closeable {
class basic_obuf : public extended_streambuf {
public:
basic_obuf(size_t n = 0)
{
@@ -178,7 +188,12 @@ namespace details {
}
int sync()
{
return overflow(EOF);
if(overflow(EOF) != 0)
return -1;
if(self().flush() != 0)
return -1;
return 0;

}
private:
size_t expected_size_;
@@ -198,7 +213,8 @@ namespace details {
buffer_(4096)
{
}
int write(char const *p,int n)
/*
int do_write(char const *p,int n,bool flush)
{
if(!out_ || !opened_) {
return 0;
@@ -215,7 +231,7 @@ namespace details {
z_stream_.avail_out = chunk_.size();
z_stream_.next_out = (Bytef*)(&chunk_[0]);

deflate(&z_stream_,Z_NO_FLUSH);
deflate(&z_stream_,(flush ? Z_SYNC_FLUSH : Z_NO_FLUSH));
int have = chunk_.size() - z_stream_.avail_out;
if(out_->sputn(&chunk_[0],have)!=have) {
close();
@@ -225,6 +241,14 @@ namespace details {

return 0;
}
int write(char const *p,int n)
{
return do_write(p,n,false);
}
int flush()
{
return do_write("",0,true);
}
void close()
{
if(!opened_)
@@ -252,11 +276,77 @@ namespace details {
out_ = 0;

}
void open(std::streambuf *out,int level = -1,int buffer_size=-1)
*/
int do_write(char const *p,int n,int flush_flag)
{
if(!out_ || !opened_) {
return 0;
}

if(n==0 && flush_flag==Z_NO_FLUSH) {
return 0;
}

z_stream_.avail_in = n;
z_stream_.next_in = (Bytef*)(p);

do {
std::pair<char *,char *> range = out_->write_range();
int have_space = (range.second - range.first);
if(have_space > 0) {
z_stream_.avail_out = have_space;
z_stream_.next_out = (Bytef*)(range.first);
deflate(&z_stream_,flush_flag);
int have = have_space - z_stream_.avail_out;
if(out_->consume_range(have)!=0) {
out_ = 0;
return -1;
}
}
else {
chunk_.resize(1024);
z_stream_.avail_out = chunk_.size();
z_stream_.next_out = (Bytef*)(&chunk_[0]);
deflate(&z_stream_,flush_flag);
int have = chunk_.size() - z_stream_.avail_out;
if(out_->sputn(&chunk_[0],have)!=have) {
out_ = 0;
return -1;
}
}
} while(z_stream_.avail_out == 0);

return 0;
}
int write(char const *p,int n)
{
return do_write(p,n,Z_NO_FLUSH);
}
int flush()
{
return do_write("",0,Z_SYNC_FLUSH);
}
void close()
{
if(!opened_)
return;
pubsync();

if(out_)
do_write(0,0,Z_FINISH);
if(out_)
out_->pubsync();

deflateEnd(&z_stream_);
opened_ = false;
z_stream_ = z_stream();
chunk_.clear();
out_ = 0;

}
void open(extended_streambuf *out,int level,int buffer_size)
{
level_ = level;
if(buffer_size == -1)
buffer_size = 4096;
buffer_ = buffer_size;
out_ = out;
if(deflateInit2(&z_stream_,
@@ -274,7 +364,6 @@ namespace details {
throw booster::runtime_error(error);
}
opened_ = true;
chunk_.resize(buffer_,0);
}
~gzip_buf()
{
@@ -285,13 +374,13 @@ namespace details {
bool opened_;
std::vector<char> chunk_;
z_stream z_stream_;
std::streambuf *out_;
extended_streambuf *out_;
int level_;
size_t buffer_;
};

#endif
class basic_device : public closeable {
class basic_device : public extended_streambuf {
public:
basic_device() :
final_(false),
@@ -313,9 +402,10 @@ namespace details {
booster::system::error_code e;
return write(out,e);
}
int write(booster::aio::const_buffer const &out,booster::system::error_code &e)
{
if(out.empty())
if(out.empty())
return 0;
booster::shared_ptr<impl::cgi::connection> c = conn_.lock();
if(!c)
@@ -353,7 +443,6 @@ namespace details {
if((epptr() - pptr()) >= n) {
memcpy(pptr(),s,n);
pbump(n);
return n;
}
else {
booster::aio::const_buffer out=booster::aio::buffer(pbase(),pptr()-pbase());
@@ -361,8 +450,8 @@ namespace details {
if(write(out)!=0)
return -1;
do_setp();
return n;
}
return n;
}

void do_setp()
@@ -379,7 +468,7 @@ namespace details {
setp(pbase(),epptr());
return r;
}
virtual std::streambuf *setbuf(char const *,std::streamsize size)
virtual std::streambuf *setbuf(char *,std::streamsize size)
{
buffer_size_ = size;
std::streamsize content_size = pptr() - pbase();
@@ -427,6 +516,10 @@ namespace details {
}
return 0;
}
bool full_buffering()
{
return full_buffering_;
}
std::streambuf *setbuf(char *s,std::streamsize size)
{
if(full_buffering_) {
@@ -516,7 +609,7 @@ struct response::_data {
typedef std::map<std::string,std::string,compare_type> headers_type;
headers_type headers;
std::list<std::string> added_headers;
std::list<details::closeable *> buffers;
std::list<details::extended_streambuf *> buffers;

details::async_io_buf buffered;
details::copy_buf cached;
@@ -526,11 +619,16 @@ struct response::_data {
details::output_device output_buf;
std::ostream output;
booster::weak_ptr<impl::cgi::connection> conn;
int required_buffer_size;
bool full_buffering;

_data(impl::cgi::connection *c) :
headers(details::string_i_comp),
output(0),
conn(c->shared_from_this())
conn(c->shared_from_this()),
required_buffer_size(-1),
full_buffering(true)

{
}
};
@@ -604,12 +702,46 @@ void response::finalize()
{
if(!finalized_) {
out();
for(std::list<details::closeable *>::iterator p=d->buffers.begin();p!=d->buffers.end();++p)
for(std::list<details::extended_streambuf *>::iterator p=d->buffers.begin();p!=d->buffers.end();++p)
(*p)->close();
finalized_=1;
}
}

void response::setbuf(int buffer_size)
{
if(buffer_size < 0)
buffer_size = -1;
d->required_buffer_size = buffer_size;

if(ostream_requested_) {
if(buffer_size < 0) {
if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw)
buffer_size = context_.service().cached_settings().service.async_output_buffer_size;
else
buffer_size = context_.service().cached_settings().service.output_buffer_size;
}
d->buffers.back()->pubsetbuf(0,buffer_size);
}
}

void response::full_asynchronous_buffering(bool enable)
{
d->buffered.full_buffering(enable);
}
bool response::full_asynchronous_buffering()
{
return d->buffered.full_buffering();
}
bool response::pending_blocked_output()
{
booster::shared_ptr<impl::cgi::connection> conn = d->conn.lock();
if(!conn)
return false;
return conn->has_pending();
}


std::string response::get_header(std::string const &name)
{
_data::headers_type::const_iterator p=d->headers.find(name);
@@ -727,12 +859,18 @@ std::ostream &response::out()
throw cppcms_error("Request for output stream for finalized request is illegal");
if(io_mode_ == asynchronous || io_mode_ == asynchronous_raw) {
d->buffered.open(d->conn,context_.service().cached_settings().service.async_output_buffer_size);
size_t bsize = context_.service().cached_settings().service.async_output_buffer_size;
if(d->required_buffer_size != -1)
bsize = d->required_buffer_size;
d->buffered.open(d->conn,bsize);
d->output.rdbuf(&d->buffered);
d->buffers.push_front(&d->buffered);
}
else {
d->output_buf.open(d->conn,context_.service().cached_settings().service.output_buffer_size);
size_t bsize = context_.service().cached_settings().service.output_buffer_size;
if(d->required_buffer_size != -1)
bsize = d->required_buffer_size;
d->output_buf.open(d->conn,bsize);
d->output.rdbuf(&d->output_buf);
d->buffers.push_front(&d->output_buf);
}
@@ -761,7 +899,7 @@ std::ostream &response::out()
if(gzip) {
int level=context_.service().cached_settings().gzip.level;
int buffer=context_.service().cached_settings().gzip.buffer;
d->zbuf.open(d->output.rdbuf(),level,buffer);
d->zbuf.open(d->buffers.front(),level,buffer);
d->output.rdbuf(&d->zbuf);
d->buffers.push_front(&d->zbuf);
}


+ 20
- 5
tests/dummy_api.h View File

@@ -16,9 +16,11 @@ using cppcms::impl::cgi::callback;

class dummy_api : public cppcms::impl::cgi::connection {
public:
dummy_api(cppcms::service &srv,std::map<std::string,std::string> env,std::string &output) :
dummy_api(cppcms::service &srv,std::map<std::string,std::string> env,std::string &output,bool mark_chunks=false,bool write_eof=false) :
cppcms::impl::cgi::connection(srv),
output_(&output)
output_(&output),
write_eof_(write_eof),
mark_chunks_(mark_chunks)
{
for(std::map<std::string,std::string>::iterator p=env.begin();p!=env.end();++p)
env_.add(pool_.add(p->first),pool_.add(p->second));
@@ -40,12 +42,23 @@ public:

virtual void do_eof(){}
virtual void on_some_output_written() {}
virtual bool write(booster::aio::const_buffer const &in,bool,booster::system::error_code &)
virtual bool write(booster::aio::const_buffer const &in,bool eof,booster::system::error_code &)
{
std::pair<booster::aio::const_buffer::entry const *,size_t> all=in.get();
for(size_t i=0;i<all.second;i++)
for(size_t i=0;i<all.second;i++) {
if(mark_chunks_)
output_->append("[");
output_->append(reinterpret_cast<char const *>(all.first[i].ptr),all.first[i].size);
return in.bytes_count();
if(mark_chunks_)
output_->append("]");
}
if(eof && write_eof_)
output_->append("[EOF]");
return true;
}
virtual bool nonblocking_write(booster::aio::const_buffer const &in,bool eof,booster::system::error_code &e)
{
return write(in,eof,e);
}
virtual booster::aio::stream_socket &socket()
{
@@ -66,6 +79,8 @@ public:
}
private:
std::string *output_;
bool write_eof_;
bool mark_chunks_;

};



+ 11
- 0
tests/gzip_test.cpp View File

@@ -23,6 +23,7 @@ public:
{
dispatcher().assign("/ca",&unit_test::compressed_a,this);
dispatcher().assign("/cb",&unit_test::compressed_b,this);
dispatcher().assign("/cbig",&unit_test::compressed_big,this);
dispatcher().assign("/bin",&unit_test::binary,this);
dispatcher().assign("/not",&unit_test::not_compressed,this);
}
@@ -35,6 +36,16 @@ public:
response().set_plain_text_header();
response().out()<< "test b";
}
void compressed_big()
{
response().set_plain_text_header();
int total = 100000;
for(int i=0;i<total;i++) {
response().out() << i << "\n";
if(i==total / 2)
response().out() << std::flush;
}
}
void binary()
{
response().content_type("application/octet-stream");


+ 11
- 1
tests/gzip_test.py View File

@@ -23,7 +23,12 @@ def decompress(content):
zstream = gzip.GzipFile(mode='r', fileobj=virt)
result = zstream.read()
return result

def big():
slist=[]
for x in xrange(0,100000):
slist.append(str(x)+'\n')
return ''.join(slist)


def test_valid(url,accepts,compressed,expected):
@@ -39,8 +44,13 @@ def test_valid(url,accepts,compressed,expected):

test_valid('/ca',True,True,'test a')
test_valid('/ca',False,False,'test a')

test_valid('/cb',True,True,'test b')
test_valid('/cb',False,False,'test b')
bg=big()
test_valid('/cbig',True,True,bg)
test_valid('/cbig',False,False,bg)

test_valid('/bin',True,False,'binary')
test_valid('/bin',False,False,'binary')
test_valid('/not',True,False,'not compressed')


+ 151
- 0
tests/response_test.cpp View File

@@ -0,0 +1,151 @@
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (C) 2008-2012 Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
// See accompanying file COPYING.TXT file for licensing details.
//
///////////////////////////////////////////////////////////////////////////////
#include "tc_test_content.h"
#include <cppcms/service.h>
#include <cppcms/http_response.h>
#include <cppcms/json.h>
#include <cppcms/cache_interface.h>
#include <cppcms/url_mapper.h>
#include "dummy_api.h"
#include "test.h"

#include <iomanip>
#include <sstream>

void compare_strings(std::string const &l,std::string const &r,int file_line)
{
if(l==r) {
//std::cerr << "[" << l << "] == [" << r << "]" << " at " << file_line << " OK !" << std::endl;
return;
}
/*
size_t m = l.size();
if(r.size() > m) m = r.size();
int line = 1;
for(size_t i=0;i<m;i++) {
std::string lstr = conv(l,i);
std::string rstr = conv(r,i);
if(lstr=="\\n")
line++;
std::cerr << std::setw(4) << line << " [" << lstr << '|' << rstr << "] ";
if(lstr!=rstr)
std::cerr << "<----------" << std::endl;
else
std::cerr << std::endl;
}*/
std::cerr << "[" << l << "]!=[" << r << "]" << " at " << file_line << std::endl;
throw std::runtime_error("Failed test");

}

#define TEQ(l,r) compare_strings(l,r,__LINE__)

class test_app : public cppcms::application {
public:
test_app(cppcms::service &srv) :
cppcms::application(srv),
srv_(srv)
{
mapper().assign("foo","/foo");
mapper().assign("/");
mapper().assign("/{1}");
mapper().assign("/{1}/{2}");
}
~test_app()
{
release_context();
}
void set_context(bool mark_chunks=false,bool mark_eof=false)
{
std::map<std::string,std::string> env;
env["HTTP_HOST"]="www.example.com";
env["SCRIPT_NAME"]="/foo";
env["PATH_INFO"]="/bar";
env["REQUEST_METHOD"]="GET";
env["HTTP_ACCEPT_ENCODING"]=="gzip";
booster::shared_ptr<dummy_api> api(new dummy_api(srv_,env,output_,mark_chunks,mark_eof));
booster::shared_ptr<cppcms::http::context> cnt(new cppcms::http::context(api));
assign_context(cnt);
output_.clear();
}

std::string str()
{
std::string result = output_;
output_.clear();
return result;
}

void test_buffer_size(bool cache_it,bool async,bool zipit)
{
std::cout << "- Test setbuf/flush " << std::endl;
std::cout << "-- " << (cache_it ? "with": "without") << " cache, " << (zipit ? "with" : "without") << " gzip" << " mode " << (async ? "async" : "sync")<< std::endl;
set_context(true,true);
response().setbuf(0);
if(cache_it) {
cache().fetch_page("none");
}
if(async) {
zipit = false;
response().io_mode(cppcms::http::response::asynchronous);
}
else {
if(!zipit)
response().io_mode(cppcms::http::response::nogzip);
}
response().out();
str();
response().out() << "x";
TEQ(str(),"[x]");
response().out() << 123;
TEQ(str(),"[123]");
response().setbuf(4);
response().out() << "abcdefg";
TEQ(str(),"[abcdefg]");
response().out() << 124;
TEQ(str(),"");
response().out() << std::flush;
TEQ(str(),"[124]");
response().out() << "xxx";
TEQ(str(),"");
response().setbuf(0);
TEQ(str(),"[xxx]");
cache().store_page("something else");
TEQ(str(),"[EOF]");
}

private:
std::string output_;
cppcms::service &srv_;
};


int main(int argc,char **argv)
{
try {
cppcms::json::value cfg;
cfg["cache"]["backend"]="thread_shared";
cfg["cache"]["limit"]=100;
cppcms::service srv(cfg);
test_app app(srv);

app.test_buffer_size(false,false,false);
app.test_buffer_size(true,false,false);

}
catch(std::exception const &e)
{
std::cerr << "Fail " << e.what() << std::endl;
return 1;
}
std::cout << "Ok" << std::endl;
return 0;
}




Loading…
Cancel
Save