From b4c3856ca2b6a625f63a5a84493f246aff95315e Mon Sep 17 00:00:00 2001 From: Artyom Beilis Date: Sat, 21 Nov 2015 21:16:21 +0000 Subject: [PATCH] Added tests for non-blocking I/O Fixed issue with parsing of partial CGI headers in HTTP backend --- CMakeLists.txt | 2 +- private/http_parser.h | 66 +++++++++++++++++++++++++------ src/cgi_api.cpp | 4 ++ src/http_api.cpp | 43 +++++++++++++------- tests/disco_test.cpp | 62 ++++++++++++++++++++++++++++- tests/disco_test.py | 6 +++ tests/disco_test_async_cgi_nonblocking.in | 11 ++++++ tests/disco_test_async_nonblocking.in | 2 + tests/proto_test.cpp | 20 +++++++++- tests/proto_test.js | 2 +- 10 files changed, 185 insertions(+), 33 deletions(-) create mode 100644 tests/disco_test_async_cgi_nonblocking.in create mode 100644 tests/disco_test_async_nonblocking.in diff --git a/CMakeLists.txt b/CMakeLists.txt index f53cc39..e78b8a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -940,7 +940,7 @@ foreach(LOC client server both) endforeach() endforeach() -foreach(TYPE async sync) +foreach(TYPE async sync nonblocking) add_test(proto_test_${TYPE}_http proto_test "-c" "${CNF}/proto_test.js" "--test-async=${TYPE}" "--service-api=http" "--service-port=8080" "--service-ip=127.0.0.1" diff --git a/private/http_parser.h b/private/http_parser.h index f0d3284..9e60fb5 100644 --- a/private/http_parser.h +++ b/private/http_parser.h @@ -10,6 +10,7 @@ #include "http_protocol.h" #include #include +#include #ifdef getc #undef getc @@ -35,8 +36,10 @@ class parser { unsigned bracket_counter_; - std::vector &body_; - unsigned &body_ptr_; + std::vector *body_; + unsigned *body_ptr_; + char const **pbase_,**pptr_,**epptr_; + std::stack ungot_; // Non copyable @@ -46,23 +49,45 @@ class parser { protected: inline int getc() { - if(body_ptr_ < body_.size()) { - return (unsigned char)body_[body_ptr_++]; + if(!ungot_.empty()) { + unsigned char r=ungot_.top(); + ungot_.pop(); + return r; + } + if(body_) { + if(*body_ptr_ < body_->size()) { + return (unsigned char)(*body_)[(*body_ptr_)++]; + } + else { + body_->clear(); + *body_ptr_=0; + return -1; + } } else { - body_.clear(); - body_ptr_=0; - return -1; + if(*pptr_ != *epptr_) { + unsigned char c = *(*pptr_)++; + return c; + } + else { + return -1; + } } } inline void ungetc(int c) { - if(body_ptr_ > 0) { - body_ptr_--; - body_[body_ptr_]=c; + if(body_) { + if(*body_ptr_ > 0) + (*body_ptr_)--; + else + ungot_.push(c); } else { - body_.insert(body_.begin(),c); + if(*pbase_!=*pptr_) + (*pptr_)--; + else + ungot_.push(c); + } } @@ -72,8 +97,23 @@ public: parser(std::vector &body,unsigned &body_ptr) : state_(idle), bracket_counter_(0), - body_(body), - body_ptr_(body_ptr) + body_(&body), + body_ptr_(&body_ptr), + pbase_(0), + pptr_(0), + epptr_(0) + { + header_.reserve(32); + } + parser(char const *&pbase,char const *&pptr,char const *&epptr) : + state_(idle), + bracket_counter_(0), + body_(0), + body_ptr_(0), + pbase_(&pbase), + pptr_(&pptr), + epptr_(&epptr) + { header_.reserve(32); } diff --git a/src/cgi_api.cpp b/src/cgi_api.cpp index c0af8fa..40475f3 100644 --- a/src/cgi_api.cpp +++ b/src/cgi_api.cpp @@ -494,6 +494,8 @@ bool connection::write(booster::aio::const_buffer const &buf,bool eof,booster::s booster::aio::const_buffer new_data = format_output(buf,eof,e); if(e) return false; booster::aio::const_buffer output = pending_output_.empty() ? new_data : (booster::aio::buffer(pending_output_) + new_data); + if(output.empty()) + return true; socket().set_non_blocking_if_needed(false,e); if(e) return false; @@ -512,6 +514,8 @@ bool connection::nonblocking_write(booster::aio::const_buffer const &buf,bool eo booster::aio::const_buffer new_data = format_output(buf,eof,e); if(e) return false; booster::aio::const_buffer output = pending_output_.empty() ? new_data : (booster::aio::buffer(pending_output_) + new_data); + if(output.empty()) + return true; socket().set_non_blocking_if_needed(true,e); if(e) return false; diff --git a/src/http_api.cpp b/src/http_api.cpp index b952145..4561620 100644 --- a/src/http_api.cpp +++ b/src/http_api.cpp @@ -126,8 +126,10 @@ namespace cgi { socket_(srv.impl().get_io_service()), input_body_ptr_(0), input_parser_(input_body_,input_body_ptr_), - output_body_ptr_(0), - output_parser_(output_body_,output_body_ptr_), + output_pbase_(0), + output_pptr_(0), + output_epptr_(0), + output_parser_(output_pbase_,output_pptr_,output_epptr_), request_method_(non_const_empty_string), request_uri_(non_const_empty_string), headers_done_(false), @@ -447,8 +449,13 @@ namespace cgi { std::pair tmp = in.get(); booster::aio::const_buffer::entry const *entry = tmp.first; size_t parts = tmp.second; + output_pbase_=output_pptr_=output_epptr_=0; + bool r=false; while(parts > 0) { - bool r =process_output_headers(entry->ptr,entry->size,e); + output_pbase_=static_cast(entry->ptr); + output_pptr_ = output_pbase_; + output_epptr_ = output_pbase_ + entry->size; + r =process_output_headers(e); parts--; entry++; if(r) { @@ -459,28 +466,34 @@ namespace cgi { } if(e) return packet; - packet+= booster::aio::buffer(response_line_); - packet+= booster::aio::buffer(output_body_); - while(parts > 0) { - packet+= booster::aio::buffer(entry->ptr,entry->size); - parts --; - entry++; + if(!r) { + tmp = in.get(); + entry = tmp.first; + parts = tmp.second; + output_body_.reserve(output_body_.size() + in.bytes_count()); + while(parts > 0) { + output_body_.insert(output_body_.end(),entry->ptr,entry->ptr+entry->size); + parts--; + entry++; + } + return packet; } + packet+= booster::aio::buffer(response_line_); + if(!output_body_.empty()) + packet+= booster::aio::buffer(output_body_); + packet+= in; return packet; } private: - bool process_output_headers(void const *p,size_t s,booster::system::error_code &e) + bool process_output_headers(booster::system::error_code &e) { static char const *addon = "Server: CppCMS-Embedded/" CPPCMS_PACKAGE_VERSION "\r\n" "Connection: close\r\n"; - char const *ptr=static_cast(p); - output_body_.insert(output_body_.end(),ptr,ptr+s); using cppcms::http::impl::parser; - for(;;) { switch(output_parser_.step()) { case parser::more_data: @@ -660,7 +673,9 @@ namespace cgi { unsigned input_body_ptr_; ::cppcms::http::impl::parser input_parser_; std::vector output_body_; - unsigned output_body_ptr_; + char const *output_pbase_; + char const *output_pptr_; + char const *output_epptr_; ::cppcms::http::impl::parser output_parser_; diff --git a/tests/disco_test.cpp b/tests/disco_test.cpp index 709aaae..508cd99 100644 --- a/tests/disco_test.cpp +++ b/tests/disco_test.cpp @@ -98,6 +98,60 @@ public: }; +class nonblocking_unit_test : public cppcms::application { +public: + nonblocking_unit_test(cppcms::service &s) : cppcms::application(s) + { + } + struct binder : public booster::callable { + typedef booster::intrusive_ptr self_ptr; + booster::shared_ptr context; + int counter; + binder(booster::shared_ptr ctx) : + context(ctx), + counter(0) + { + } + void run() + { + (*this)(cppcms::http::context::operation_completed); + } + void operator()(cppcms::http::context::completion_type ct) + { + if(ct == cppcms::http::context::operation_aborted) { + std::cout << "Error on completion detected" << std::endl; + bad_count++; + return; + } + std::ostream &out = context->response().out(); + for(;counter < 100000;counter++) { + out << counter << '\n'; + if(!out) { + std::cout << "Error on stream detected" << std::endl; + bad_count++; + return; + } + if(context->response().pending_blocked_output()) { + std::cout << "Got blocking status at" << counter << std::endl; + context->async_flush_output(self_ptr(this)); + return; + } + } + std::cout << "No error detected" << std::endl; + context->async_complete_response(); + } + }; + + void main(std::string) + { + response().setbuf(0); + response().full_asynchronous_buffering(false); + calls ++; + binder::self_ptr p(new binder(release_context())); + p->run(); + } +}; + int main(int argc,char **argv) @@ -105,7 +159,11 @@ int main(int argc,char **argv) try { cppcms::service srv(argc,argv); booster::intrusive_ptr async = new async_unit_test(srv); + booster::intrusive_ptr nb = new nonblocking_unit_test(srv); + srv.applications_pool().mount( async, cppcms::mount_point("/async") ); + srv.applications_pool().mount( nb, cppcms::mount_point("/nonblocking") ); + srv.applications_pool().mount( cppcms::applications_factory(), cppcms::mount_point("/sync")); srv.after_fork(submitter(srv)); srv.run(); @@ -114,8 +172,8 @@ int main(int argc,char **argv) std::cerr << e.what() << std::endl; return EXIT_FAILURE; } - if(bad_count != 3 || calls != 4) { - std::cerr << "Failed bad_count = " << bad_count << " calls = " << calls << std::endl; + if(bad_count != 4 || calls != 5) { + std::cerr << "Failed bad_count = " << bad_count << " (exp 4) calls = " << calls << " (exp 5)"<< std::endl; return EXIT_FAILURE; } std::cout << "Ok" << std::endl; diff --git a/tests/disco_test.py b/tests/disco_test.py index e548319..26f1d66 100755 --- a/tests/disco_test.py +++ b/tests/disco_test.py @@ -56,6 +56,8 @@ if test=='http': test_io(input,socket_type,target); input = load_file('disco_test_async_single.in'); test_io(input,socket_type,target); + input = load_file('disco_test_async_nonblocking.in'); + test_io(input,socket_type,target); elif test=='fastcgi_tcp' or test=='fastcgi_unix': input = tofcgi.to_fcgi_request(load_file('disco_test_norm_cgi.in')); test_io(input,socket_type,target); @@ -65,6 +67,8 @@ elif test=='fastcgi_tcp' or test=='fastcgi_unix': test_io(input,socket_type,target); input = tofcgi.to_fcgi_request(load_file('disco_test_async_cgi_single.in')); test_io(input,socket_type,target); + input = tofcgi.to_fcgi_request(load_file('disco_test_async_cgi_nonblocking.in')); + test_io(input,socket_type,target); elif test=='scgi_tcp' or test=='scgi_unix': input = toscgi.toscgi(load_file('disco_test_norm_cgi.in')); test_io(input,socket_type,target); @@ -74,6 +78,8 @@ elif test=='scgi_tcp' or test=='scgi_unix': test_io(input,socket_type,target); input = toscgi.toscgi(load_file('disco_test_async_cgi_single.in')); test_io(input,socket_type,target); + input = toscgi.toscgi(load_file('disco_test_async_cgi_nonblocking.in')); + test_io(input,socket_type,target); else: usege() diff --git a/tests/disco_test_async_cgi_nonblocking.in b/tests/disco_test_async_cgi_nonblocking.in new file mode 100644 index 0000000..bea24a0 --- /dev/null +++ b/tests/disco_test_async_cgi_nonblocking.in @@ -0,0 +1,11 @@ +GATEWAY_INTERFACE:CGI/1.0 +PATH_INFO: +REMOTE_ADDR: +REMOTE_HOST: +REQUEST_METHOD:GET +SCRIPT_NAME:/nonblocking +SERVER_NAME:127.0.0.1 +SERVER_PORT:8080 +SERVER_PROTOCOL:HTTP/1.0 +SERVER_SOFTWARE:CppCMS/1.1.0 + diff --git a/tests/disco_test_async_nonblocking.in b/tests/disco_test_async_nonblocking.in new file mode 100644 index 0000000..bad8e19 --- /dev/null +++ b/tests/disco_test_async_nonblocking.in @@ -0,0 +1,2 @@ +GET /nonblocking HTTP/1.0 + diff --git a/tests/proto_test.cpp b/tests/proto_test.cpp index 85c3be9..6d12feb 100644 --- a/tests/proto_test.cpp +++ b/tests/proto_test.cpp @@ -17,6 +17,7 @@ #include "test.h" bool is_async; +bool is_nonblocking; class unit_test : public cppcms::application { public: @@ -26,6 +27,7 @@ public: virtual void main(std::string /*unused*/) { response().set_plain_text_header(); + response().setbuf(64); TEST(is_async == is_asynchronous()); if(!is_asynchronous()) { TEST(response().io_mode() == cppcms::http::response::normal); @@ -34,6 +36,9 @@ public: else { TEST(response().io_mode() == cppcms::http::response::asynchronous); } + if(is_nonblocking) { + response().full_asynchronous_buffering(false); + } std::map env=request().getenv(); std::ostream &out = response().out(); for(std::map::const_iterator p=env.begin();p!=env.end();++p) { @@ -62,8 +67,19 @@ int main(int argc,char **argv) srv.applications_pool().mount( cppcms::applications_factory()); } else { - is_async = true; - std::cout << "Asynchronous testing" << std::endl; + if(srv.settings().get("test.async")=="async") { + is_async = true; + std::cout << "Asynchronous testing" << std::endl; + } + else if(srv.settings().get("test.async")=="nonblocking") { + is_async = true; + is_nonblocking = true; + std::cout << "Non blocking testing" << std::endl; + } + else { + std::cerr << "Invalid configuration value of test.async" << std::endl; + return 1; + } app=new unit_test(srv); srv.applications_pool().mount(app); } diff --git a/tests/proto_test.js b/tests/proto_test.js index e0bcf92..e63b8fa 100644 --- a/tests/proto_test.js +++ b/tests/proto_test.js @@ -4,7 +4,7 @@ "worker_threads" : 5 }, "http" : { - "script_names" : [ "/test" , "/async" , "/sync" ] + "script_names" : [ "/test" , "/async" , "/sync", "/nonblocking" ] }, "localization" : { "messages" : {