@@ -26,6 +26,9 @@ namespace cppcms { | |||
namespace booster { | |||
void CPPCMS_API intrusive_ptr_add_ref(cppcms::application *p); | |||
void CPPCMS_API intrusive_ptr_release(cppcms::application *p); | |||
namespace aio { | |||
class io_service; | |||
} | |||
} | |||
namespace cppcms { | |||
@@ -66,6 +69,28 @@ namespace cppcms { | |||
application_specific_pool(); | |||
virtual ~application_specific_pool(); | |||
/// | |||
/// Returns asynchronous application that runs at given booster::aio::io_service constext, it the application | |||
/// does not exist yet, it is created | |||
/// | |||
/// Notes: | |||
/// | |||
/// - if the application is created upon function call it would be created in the calling thread regardless if it is event loop thread or not | |||
/// - If the pool isn't mounted as asynchronous pool then cppcms_error is thrown | |||
/// - if the io_srv isn't main cppcms io_service cppcms_error is thrown | |||
/// | |||
booster::intrusive_ptr<application> asynchronous_application_by_io_service(booster::aio::io_service &io_srv,cppcms::service &srv); | |||
/// | |||
/// Returns asynchronous application that runs at given booster::aio::io_service constext, it the application | |||
/// does not exist yet NULL pointer is returned | |||
/// | |||
/// Notes: | |||
/// | |||
/// - If the pool isn't mounted as asynchronous pool then cppcms_error is thrown | |||
/// - if the io_srv isn't main cppcms io_service cppcms_error is thrown | |||
/// | |||
booster::intrusive_ptr<application> asynchronous_application_by_io_service(booster::aio::io_service &io_srv); | |||
protected: | |||
/// | |||
/// Returns newly created instance of an application, its ownership | |||
@@ -29,12 +29,17 @@ public: | |||
virtual void prepopulate(cppcms::service &srv) = 0; | |||
virtual void application_requested(cppcms::service &) {} | |||
virtual ~_policy() {} | |||
virtual booster::intrusive_ptr<application> get_async(booster::aio::io_service &,cppcms::service *srv=0); | |||
virtual booster::intrusive_ptr<application> get(cppcms::service &srv) = 0; | |||
virtual void put(application *app) = 0; | |||
application *get_new(cppcms::service &srv) { return self_->get_new(srv); } | |||
protected: | |||
application_specific_pool *self_; | |||
}; | |||
booster::intrusive_ptr<application> application_specific_pool::_policy::get_async(booster::aio::io_service &,cppcms::service *) | |||
{ | |||
throw cppcms_error("Is not implemented for synchronous application"); | |||
} | |||
class application_specific_pool::_tls_policy : public application_specific_pool::_policy { | |||
public: | |||
@@ -170,28 +175,49 @@ private: | |||
class application_specific_pool::_async_policy : public application_specific_pool::_policy{ | |||
public: | |||
_async_policy(application_specific_pool *self) : | |||
_policy(self) | |||
_policy(self), | |||
io_srv_(0) | |||
{ | |||
} | |||
virtual void prepopulate(cppcms::service &srv) | |||
{ | |||
if((self_->flags() & app::prepopulated) && !(self_->flags() & app::legacy)) { | |||
if(!app_) | |||
if(!app_) { | |||
app_ = get_new(srv); | |||
io_srv_ = &srv.get_io_service(); | |||
} | |||
} | |||
} | |||
virtual booster::intrusive_ptr<application> get(cppcms::service &srv) | |||
{ | |||
if(!app_) | |||
if(!app_) { | |||
app_ = get_new(srv); | |||
if(app_) { | |||
io_srv_ = &srv.get_io_service(); | |||
} | |||
} | |||
return app_; | |||
} | |||
virtual void put(application *) | |||
{ | |||
// SHOULD NEVER BE CALLED as when pool is destroyed and app_ is destroyed weak_ptr would be invalid | |||
} | |||
virtual booster::intrusive_ptr<application> get_async(booster::aio::io_service &io_srv,cppcms::service *srv = 0) | |||
{ | |||
if(app_) { | |||
if(&io_srv == io_srv_) | |||
return app_; | |||
else | |||
throw cppcms_error("given booster::aio::io_service isn't main event loop io_service"); | |||
} | |||
if(!srv) | |||
return 0; | |||
return get(*srv); | |||
} | |||
private: | |||
booster::intrusive_ptr<application> app_; | |||
booster::aio::io_service *io_srv_; | |||
}; | |||
class application_specific_pool::_async_legacy_policy : public application_specific_pool::_policy{ | |||
@@ -286,6 +312,16 @@ void application_specific_pool::flags(int flags) | |||
} | |||
} | |||
booster::intrusive_ptr<application> application_specific_pool::asynchronous_application_by_io_service(booster::aio::io_service &ios,cppcms::service &srv) | |||
{ | |||
return d->policy->get_async(ios,&srv); | |||
} | |||
booster::intrusive_ptr<application> application_specific_pool::asynchronous_application_by_io_service(booster::aio::io_service &ios) | |||
{ | |||
return d->policy->get_async(ios); | |||
} | |||
application *application_specific_pool::get_new(service &srv) | |||
{ | |||
application *a = new_application(srv); | |||
@@ -125,12 +125,14 @@ void context::submit_to_pool(booster::shared_ptr<application_specific_pool> pool | |||
namespace { | |||
struct dispatch_binder { | |||
void (*dispatch)(booster::intrusive_ptr<application> const &,std::string const &,bool); | |||
booster::shared_ptr<context> ctx; | |||
booster::intrusive_ptr<application> app; | |||
std::string matched; | |||
bool flag; | |||
void operator()() | |||
{ | |||
app->assign_context(ctx); | |||
dispatch(app,matched,flag); | |||
} | |||
@@ -154,11 +156,10 @@ namespace { | |||
} | |||
void context::submit_to_asynchronous_application(booster::intrusive_ptr<application> app,std::string const &matched) | |||
{ | |||
app->assign_context(self()); | |||
response().io_mode(http::response::asynchronous); | |||
dispatch_binder bd = { &context::dispatch, app,matched,false }; | |||
dispatch_binder bd = { &context::dispatch, self(), app,matched,false }; | |||
conn_->get_io_service().post(bd); | |||
} | |||
@@ -23,6 +23,9 @@ | |||
#include <booster/log.h> | |||
#include "test.h" | |||
int g_fail; | |||
#define TESTNT(x) do { if(x) break; std::cerr << "FAIL: " #x " in line: " << __LINE__ << std::endl; g_fail = 1; return; } while(0) | |||
booster::thread_specific_ptr<int> g_thread_id; | |||
booster::mutex g_id_lock; | |||
int g_thread_id_counter=1000; | |||
@@ -195,6 +198,76 @@ struct marker { | |||
std::string name_; | |||
}; | |||
struct pools { | |||
booster::weak_ptr<cppcms::application_specific_pool> sync,async; | |||
bool async_requested; | |||
pools() : async_requested(false) {} | |||
}; | |||
class sender : public cppcms::application { | |||
public: | |||
struct src_prop { | |||
bool src_async; | |||
bool src_created; | |||
bool src_to_app; | |||
}; | |||
sender(cppcms::service &srv,pools *p) : | |||
cppcms::application(srv), | |||
pools_(p), | |||
first_time_(true) | |||
{ | |||
} | |||
void main(std::string url) | |||
{ | |||
if(url!="/sender") { | |||
src_prop *p = context().get_specific<src_prop>(); | |||
TESTNT(p); | |||
response().out() << "async=" << is_asynchronous() << "\n" | |||
"src_async=" << p->src_async <<"\n" | |||
"src_created="<< p->src_created <<"\n" | |||
"src_to_app=" << p->src_to_app <<"\n" | |||
"path=" << url<<"\n" | |||
; | |||
return; | |||
} | |||
src_prop *p=new src_prop(); | |||
context().reset_specific<src_prop>(p); | |||
bool to_app = request().get("to_app")=="1"; | |||
p->src_to_app = to_app; | |||
p->src_async = is_asynchronous(); | |||
p->src_created = false; | |||
booster::shared_ptr<cppcms::application_specific_pool> tgt; | |||
if(request().get("to")=="sync") | |||
tgt = pools_->sync.lock(); | |||
else | |||
tgt = pools_->async.lock(); | |||
booster::shared_ptr<cppcms::http::context> ctx = release_context(); | |||
TESTNT(tgt); | |||
if(!to_app) { | |||
ctx->submit_to_pool(tgt,"/pool"); | |||
return; | |||
} | |||
booster::intrusive_ptr<cppcms::application> app; | |||
app =tgt->asynchronous_application_by_io_service(service().get_io_service()); | |||
if(!app) { | |||
app = tgt->asynchronous_application_by_io_service(service().get_io_service(),service()); | |||
p->src_created=true; | |||
} | |||
TESTNT(app); | |||
ctx->submit_to_asynchronous_application(app,"/app"); | |||
} | |||
private: | |||
pools *pools_; | |||
bool first_time_; | |||
}; | |||
int main(int argc,char **argv) | |||
{ | |||
try { | |||
@@ -247,6 +320,25 @@ int main(int argc,char **argv) | |||
srv.applications_pool().mount(cppcms::create_pool<tester>(),mount_point("/test"),cppcms::app::asynchronous); | |||
pools p; | |||
{ | |||
booster::shared_ptr<cppcms::application_specific_pool> tmp; | |||
srv.applications_pool().mount( | |||
(tmp=cppcms::create_pool<sender>(&p)), | |||
mount_point("/async","/sender",0), | |||
cppcms::app::asynchronous); | |||
p.async = tmp; | |||
} | |||
{ | |||
booster::shared_ptr<cppcms::application_specific_pool> tmp; | |||
srv.applications_pool().mount( | |||
(tmp=cppcms::create_pool<sender>(&p)), | |||
mount_point("/sync","/sender",0), | |||
cppcms::app::synchronous); | |||
p.sync = tmp; | |||
} | |||
srv.after_fork(thread_submitter(srv)); | |||
srv.run(); | |||
@@ -267,9 +359,12 @@ int main(int argc,char **argv) | |||
std::cerr << e.what() << std::endl; | |||
return EXIT_FAILURE; | |||
} | |||
if(run_ok) | |||
std::cout << "Ok" << std::endl; | |||
else | |||
if(run_ok && !g_fail) { | |||
std::cout << "Full Test: Ok" << std::endl; | |||
return EXIT_SUCCESS; | |||
} | |||
else { | |||
std::cout << "FAILED" << std::endl; | |||
return run_ok ? EXIT_SUCCESS : EXIT_FAILURE; | |||
return EXIT_FAILURE; | |||
} | |||
} |
@@ -281,6 +281,54 @@ def test_async_temporary(): | |||
Conn(n).get(exp404 = True) | |||
def test_send(): | |||
print "/sync/sender" | |||
r=Conn('/sync/sender?to=async&to_app=1').get() | |||
test(r["path"]=="/app") | |||
test(r["async"]==1) | |||
test(r["src_async"]==0) | |||
test(r["src_created"]==1) | |||
test(r["src_to_app"]==1) | |||
r=Conn('/sync/sender?to=async&to_app=1').get() | |||
test(r["path"]=="/app") | |||
test(r["async"]==1) | |||
test(r["src_async"]==0) | |||
test(r["src_created"]==0) | |||
test(r["src_to_app"]==1) | |||
r=Conn('/sync/sender?to=async').get() | |||
test(r["path"]=="/pool") | |||
test(r["async"]==1) | |||
test(r["src_async"]==0) | |||
test(r["src_created"]==0) | |||
test(r["src_to_app"]==0) | |||
r=Conn('/sync/sender?to=sync').get() | |||
test(r["path"]=="/pool") | |||
test(r["async"]==0) | |||
test(r["src_async"]==0) | |||
print "/async/sender" | |||
r=Conn('/async/sender?to=async&to_app=1').get() | |||
test(r["path"]=="/app") | |||
test(r["async"]==1) | |||
test(r["src_async"]==1) | |||
test(r["src_to_app"]==1) | |||
r=Conn('/async/sender?to=async').get() | |||
test(r["path"]=="/pool") | |||
test(r["async"]==1) | |||
test(r["src_async"]==1) | |||
r=Conn('/async/sender?to=sync').get() | |||
test(r["path"]=="/pool") | |||
test(r["async"]==0) | |||
test(r["src_async"]==1) | |||
test_sync() | |||
test_sync_prep() | |||
@@ -290,4 +338,5 @@ test_async() | |||
test_async_prep() | |||
test_async_legacy() | |||
test_async_temporary() | |||
test_send() | |||
print "OK" |