@@ -27,8 +27,6 @@ struct application::data { | |||
} | |||
cppcms::service *service; | |||
intrusive_ptr<http::context> conn; | |||
typedef std::set<intrusive_ptr<http::context> > all_conn_type; | |||
all_conn_type all_conn; | |||
int pool_id; | |||
url_dispatcher url; | |||
std::vector<application *> managed_children; | |||
@@ -76,6 +74,10 @@ url_dispatcher &application::dispatcher() | |||
return d->url; | |||
} | |||
intrusive_ptr<http::context> application::get_context() | |||
{ | |||
return root()->d->conn; | |||
} | |||
http::context &application::context() | |||
{ | |||
@@ -84,9 +86,11 @@ http::context &application::context() | |||
return *root()->d->conn; | |||
} | |||
http::context *application::last_assigned_context() | |||
intrusive_ptr<http::context> application::release_context() | |||
{ | |||
return &*root()->d->conn; | |||
intrusive_ptr<http::context> ptr=root()->d->conn; | |||
assign_context(0); | |||
return ptr; | |||
} | |||
@@ -111,14 +115,7 @@ bool application::is_asynchronous() | |||
void application::assign_context(intrusive_ptr<http::context> conn) | |||
{ | |||
if(parent()!=this) | |||
throw cppcms_error("Can assign context to only root of applications tree"); | |||
if(conn == 0) | |||
d->conn = 0; | |||
else { | |||
d->conn=conn; | |||
d->all_conn.insert(conn); | |||
} | |||
root()->d->conn=conn; | |||
} | |||
void application::pool_id(int id) | |||
@@ -171,25 +168,14 @@ void application::assign(application *app,std::string regex,int part) | |||
add(*app,regex,part); | |||
} | |||
void application::release_context(http::context *conn) | |||
{ | |||
intrusive_ptr<http::context> context=conn; | |||
context->response().out() << std::flush; | |||
context->response().finalize(); | |||
context->service().post(boost::bind(&http::context::on_response_complete,context)); | |||
root()->d->all_conn.erase(context); | |||
} | |||
void application::release_all_contexts() | |||
void application::recycle() | |||
{ | |||
root()->d->conn=0; | |||
for(data::all_conn_type::iterator p=root()->d->all_conn.begin();p!=root()->d->all_conn.end();++p) { | |||
intrusive_ptr<http::context> context=*p; | |||
context->response().out() << std::flush; | |||
context->response().finalize(); | |||
context->service().post(boost::bind(&http::context::on_response_complete,context)); | |||
if(root()->d->conn) { | |||
response().out() << std::flush; | |||
response().finalize(); | |||
context().async_complete_response(); | |||
} | |||
root()->d->all_conn.clear(); | |||
assign_context(0); | |||
} | |||
void intrusive_ptr_add_ref(application *app) | |||
@@ -210,7 +196,7 @@ void intrusive_ptr_release(application *app) | |||
cppcms::service &service=app->service(); | |||
try { | |||
app->release_all_contexts(); | |||
app->recycle(); | |||
} | |||
catch(...) { | |||
if(app->pool_id() < 0) { | |||
@@ -46,7 +46,6 @@ namespace cppcms { | |||
char const *gt(char const *s); | |||
char const *ngt(char const *s,char const *p,int n); | |||
void assign_context(intrusive_ptr<http::context> conn); | |||
void add(application &app); | |||
void add(application &app,std::string regex,int part); | |||
@@ -57,14 +56,15 @@ namespace cppcms { | |||
application *parent(); | |||
application *root(); | |||
http::context *last_assigned_context(); | |||
void release_context(http::context *conn); | |||
void release_all_contexts(); | |||
intrusive_ptr<http::context> release_context(); | |||
intrusive_ptr<http::context> get_context(); | |||
void assign_context(intrusive_ptr<http::context> conn); | |||
bool is_asynchronous(); | |||
private: | |||
void recycle(); | |||
void parent(application *parent); | |||
void pool_id(int id); | |||
@@ -14,6 +14,100 @@ | |||
#include <stdexcept> | |||
#include <stdlib.h> | |||
/* | |||
class chat_room : public cppcms::application { | |||
public: | |||
chat_room(cppcms::service &srv,int idle) : | |||
cppcms::application(srv), | |||
timer_(srv), | |||
idle_time_(idle) | |||
{ | |||
on_timeout(true); | |||
dispatcher().assign("^.*$",cppcms::util::mem_bind(&chat_room::json_call,self())); | |||
} | |||
private: | |||
void on_error() | |||
{ | |||
response().status(cppcms::http::response::bad_request); | |||
release_context(last_assigned_context()); | |||
return; | |||
} | |||
void json_call() | |||
{ | |||
using namespace cppcms; | |||
if(request().content_type()!="application/json") { | |||
on_error(); | |||
} | |||
std::pair<void *,size_t> data=request().raw_post_data(); | |||
std::istringstream ss(std::string((char *)data.first,data.second)); | |||
json::value v; | |||
if(!v.load(ss,true)) { | |||
on_error(); | |||
return; | |||
} | |||
if( v.find("method").type()!=cppcms::json::is_string | |||
|| v.find("params").type()!=json::is_array | |||
|| v.find("id").type()==json::is_undefined) | |||
{ | |||
on_error(); | |||
return; | |||
} | |||
std::string method=v.get<std::string>("method"); | |||
json::array params=v.at("params").array(); | |||
json::value id=v.at("id"); | |||
json::value resp; | |||
resp.set("id",id); | |||
resp.set("result",json::null()); | |||
resp.set("error",json::null()); | |||
bool write=true; | |||
if(method=="submit") | |||
submit=(params,resp); | |||
else if(method=="fetch") | |||
write=fetch(params,resp); | |||
else { | |||
resp.set("error","Undefined Method "+method); | |||
} | |||
if(write) { | |||
response().content_type("application/json"); | |||
response().out() << resp; | |||
release_context(last_assigned_context()); | |||
} | |||
else { | |||
assign_context(0); | |||
} | |||
} | |||
void submit(json::array const ¶ms,json::value &output) | |||
{ | |||
if(params.size()!=1 || !params[0].type()==json::is_string) { | |||
output.set("error","Invalid input parameters"); | |||
return; | |||
} | |||
} | |||
intrusive_ptr<chat_room> self() | |||
{ | |||
return this; | |||
} | |||
void update_timer() | |||
{ | |||
timer_.cancel(); | |||
} | |||
void on_timeout(bool canceled) | |||
{ | |||
if(canceled) { | |||
timer_.expires_from_now(idle); | |||
timer_.async_wait(cppcms::util::mem_bind(&stock::on_timeout,self())); | |||
} | |||
release_all_contexts(); | |||
// And Die ;) | |||
} | |||
}; | |||
*/ | |||
class stock : public cppcms::application { | |||
public: | |||
stock(cppcms::service &srv) : cppcms::application(srv),timer_(srv) | |||
@@ -25,7 +119,7 @@ public: | |||
std::cout<<"stock()"<<std::endl; | |||
async_run(); | |||
} | |||
virtual ~stock() | |||
~stock() | |||
{ | |||
std::cout<<"~stock()"<<std::endl; | |||
} | |||
@@ -48,16 +142,16 @@ private: | |||
e=request().get().end(); | |||
if(p==e || atoi(p->second.c_str()) < counter_) { | |||
response().out() << price_<<std::endl; | |||
release_context(last_assigned_context()); | |||
release_context()->async_complete_response(); | |||
return; | |||
} | |||
all_.push_back(last_assigned_context()); | |||
all_.push_back(release_context()); | |||
} | |||
void broadcast() | |||
{ | |||
for(unsigned i=0;i<all_.size();i++) { | |||
all_[i]->response().out() << counter_<<":"<<price_ << std::endl; | |||
release_context(all_[i]); | |||
all_[i]->async_complete_response(); | |||
} | |||
all_.clear(); | |||
} | |||
@@ -71,23 +165,25 @@ private: | |||
counter_ ++ ; | |||
for(unsigned i=0;i<all_.size();i++) { | |||
all_[i]->response().out() << price_ << std::endl; | |||
release_context(all_[i]); | |||
all_[i]->async_complete_response(); | |||
} | |||
all_.clear(); | |||
} | |||
} | |||
response().out() << | |||
"<html>" | |||
"<body><form action='/stock/update' method='post'> " | |||
"<input type='text' name='price' /><br/>" | |||
"<input type='submit' value='Update Price' name='submit' />" | |||
"</form></body></html>"<<std::endl; | |||
release_context(last_assigned_context()); | |||
release_context()->async_complete_response(); | |||
} | |||
int counter_; | |||
double price_; | |||
std::vector<cppcms::http::context *> all_; | |||
std::vector<cppcms::intrusive_ptr<cppcms::http::context> > all_; | |||
cppcms::aio::timer timer_; | |||
}; | |||
@@ -10,6 +10,7 @@ | |||
#include "applications_pool.h" | |||
#include "thread_pool.h" | |||
#include "url_dispatcher.h" | |||
#include "cppcms_error.h" | |||
#include <boost/bind.hpp> | |||
@@ -58,7 +59,7 @@ void context::on_request_ready(bool error) | |||
app=0; | |||
response().io_mode(http::response::asynchronous); | |||
response().make_error_response(http::response::not_found); | |||
on_response_complete(); | |||
async_complete_response(); | |||
return; | |||
} | |||
@@ -80,7 +81,7 @@ void context::dispatch(intrusive_ptr<application> app,bool syncronous) | |||
app->dispatcher().dispatch(); | |||
} | |||
catch(std::exception const &e){ | |||
if(app->last_assigned_context() && !app->response().some_output_was_written()) { | |||
if(app->get_context() && !app->response().some_output_was_written()) { | |||
app->response().make_error_response(http::response::internal_server_error,e.what()); | |||
} | |||
else { | |||
@@ -90,7 +91,26 @@ void context::dispatch(intrusive_ptr<application> app,bool syncronous) | |||
} | |||
} | |||
void context::on_response_complete() | |||
namespace { | |||
void wrapper(context::handler const &h,bool r) | |||
{ | |||
h(r ? context::operation_aborted : context::operation_completed); | |||
} | |||
} | |||
void context::async_flush_output(context::handler const &h) | |||
{ | |||
if(response().io_mode() != http::response::asynchronous) { | |||
throw cppcms_error("Can't use asynchronouse operations when I/O mode is synchronous"); | |||
} | |||
conn_->async_write_response( | |||
response(), | |||
false, | |||
boost::bind(wrapper,h,_1)); | |||
} | |||
void context::async_complete_response() | |||
{ | |||
if(response().io_mode() == http::response::asynchronous) { | |||
conn_->async_write_response( | |||
@@ -5,6 +5,8 @@ | |||
#include "hold_ptr.h" | |||
#include "intrusive_ptr.h" | |||
#include "refcounted.h" | |||
#include "callback0.h" | |||
#include "callback1.h" | |||
namespace cppcms { | |||
@@ -31,7 +33,42 @@ namespace cppcms { | |||
cppcms::service &service(); | |||
void run(); | |||
void on_response_complete(); | |||
typedef enum { | |||
operation_completed, | |||
operation_aborted | |||
} complition_type; | |||
typedef util::callback1<complition_type> handler; | |||
/// | |||
/// Send all pending output data to the client and | |||
/// finalize the connection. Note, you can't use this | |||
/// object for communication any more. | |||
/// | |||
void async_complete_response(); | |||
/// | |||
/// Send all pending data to user, when operation is complete | |||
/// call handler \a h with status. | |||
/// | |||
/// Note: if the status is operation_aborted, you can't use | |||
/// this connection any more, the peer gone. | |||
/// | |||
void async_flush_output(handler const &h); | |||
/// | |||
/// Set handler for peer reset events. It is useful to cleanup | |||
/// connections that had timeout or just disconnected by user | |||
/// | |||
/// Notes: | |||
/// 1. if async_complete_response was called, handler would not | |||
/// be called any more. | |||
/// 2. If async_flush_output fails, this does not mean that | |||
/// this handler would be called as well, so you need to check both | |||
/// | |||
void async_on_peer_reset(util::callback0 const &h); | |||
private: | |||
void on_request_ready(bool error); | |||
static void dispatch(intrusive_ptr<application> app,bool syncronous); | |||