Browse Source

More asynchronous code

master
Artyom Beilis 15 years ago
parent
commit
b3cd325c1f
7 changed files with 293 additions and 111 deletions
  1. +35
    -15
      application.cpp
  2. +6
    -1
      application.h
  3. +158
    -84
      applications_pool.cpp
  4. +14
    -3
      applications_pool.h
  5. +1
    -1
      config.txt
  6. +71
    -1
      hello_world.cpp
  7. +8
    -6
      http_context.cpp

+ 35
- 15
application.cpp View File

@@ -84,6 +84,12 @@ http::context &application::context()
return *root()->d->conn;
}

http::context *application::last_assigned_context()
{
return &*root()->d->conn;
}


cppcms::locale::environment &application::locale()
{
return context().locale();
@@ -98,6 +104,10 @@ char const *application::ngt(char const *s,char const *p,int n)
{
return locale().ngt(s,p,n);
}
bool application::is_asynchronous()
{
return pool_id() < 0;
}

void application::assign_context(intrusive_ptr<http::context> conn)
{
@@ -161,17 +171,25 @@ void application::assign(application *app,std::string regex,int part)
add(*app,regex,part);
}

void application::release_context(http::context *conn)
{
intrusive_ptr<http::context> context=conn;
context->response().out() << std::flush;
context->response().finalize();
context->service().post(boost::bind(&http::context::on_response_complete,context));
root()->d->all_conn.erase(context);
}

void application::release_all_contexts()
{
d->conn=0;
for(data::all_conn_type::iterator p=d->all_conn.begin();p!=d->all_conn.end();++p) {
root()->d->conn=0;
for(data::all_conn_type::iterator p=root()->d->all_conn.begin();p!=root()->d->all_conn.end();++p) {
intrusive_ptr<http::context> context=*p;
context->response().out() << std::flush;
context->response().finalize();
context->service().post(boost::bind(&http::context::on_response_complete,context));
}
d->all_conn.clear();
root()->d->all_conn.clear();
}

void intrusive_ptr_add_ref(application *app)
@@ -180,25 +198,27 @@ void intrusive_ptr_add_ref(application *app)
}

// REMEMBER THIS IS CALLED FROM DESTRUCTOR!!!
void intrusive_ptr_release(application *app_in)
void intrusive_ptr_release(application *app)
{
// it is called in destructors... So be very careful
try {
app_in = app_in->root();
long refs=--(app_in->refs_);
app = app->root();
long refs=--(app->refs_);
if(refs > 0)
return;
std::auto_ptr<application> app(app_in);
app_in=0;

app->release_all_contexts();

// return the application to pool... or delete it if "pooled"
if(app->pool_id() >= 0) {
cppcms::service &service=app->service();
service.applications_pool().put(app);
cppcms::service &service=app->service();
try {
app->release_all_contexts();
}
catch(...) {
// we must unassign it in order to make sure it
// would not be accessed again
if(app->pool_id() < 0)
service.applications_pool().put(app);
throw;
}
// return the application to pool... or delete it if "pooled"
}
catch(...)
{


+ 6
- 1
application.h View File

@@ -55,9 +55,14 @@ namespace cppcms {
application *parent();
application *root();

http::context *last_assigned_context();
void release_context(http::context *conn);
void release_all_contexts();

bool is_asynchronous();

private:

void release_all_contexts();
void parent(application *parent);

void pool_id(int id);


+ 158
- 84
applications_pool.cpp View File

@@ -8,66 +8,89 @@
#include <vector>
#include <boost/regex.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

namespace cppcms {

namespace {
struct app_data : public util::noncopyable {
app_data(std::string script,std::auto_ptr<applications_pool::factory> f) :
script_name(script),
match(0),
use_regex(0),
factory(f),
size(0)
{
check();
}
app_data(std::string script,std::string pat,int select,std::auto_ptr<applications_pool::factory> f) :
script_name(script),
expr(pat),
match(select),
use_regex(1),
factory(f),
size(0)
struct applications_pool::basic_app_data : public util::noncopyable {
basic_app_data(std::string script) :
script_name(script),
match(0),
use_regex(0)
{
}
basic_app_data(std::string script,std::string pat,int select) :
script_name(script),
expr(pat),
match(select),
use_regex(1)
{
}
std::string script_name;
boost::regex expr;
int match;
bool use_regex;
private:
void check() const
{
if( !script_name.empty()
&& script_name[0]!='/'
&& script_name[0]!='.'
&& script_name!="*")
{
check();
throw cppcms_error("Scipt name should be either '*', start with '.' or '/' or be empty");
}
}
};
struct applications_pool::app_data : public applications_pool::basic_app_data {
app_data(std::string script,std::auto_ptr<applications_pool::factory> f) :
basic_app_data(script),
factory(f),
size(0)
{
}
app_data(std::string script,std::string pat,int select,std::auto_ptr<applications_pool::factory> f) :
basic_app_data(script,pat,select),
factory(f),
size(0)
{
}

std::string script_name;
boost::regex expr;
int match;
bool use_regex;
std::auto_ptr<applications_pool::factory> factory;

int size;
std::set<application *> pool;
std::auto_ptr<applications_pool::factory> factory;

~app_data()
{
std::set<application *>::iterator p;
for(p=pool.begin();p!=pool.end();++p) {
delete *p;
}
}
int size;
std::set<application *> pool;

void check() const
{
if( !script_name.empty()
&& script_name[0]!='/'
&& script_name[0]!='.'
&& script_name!="*")
{
throw cppcms_error("Scipt name should be either '*', start with '.' or '/' or be empty");
}
~app_data()
{
std::set<application *>::iterator p;
for(p=pool.begin();p!=pool.end();++p) {
delete *p;
}
};
}

}
};
struct applications_pool::long_running_app_data : public applications_pool::basic_app_data
{
long_running_app_data(std::string script) :
basic_app_data(script)
{
}
long_running_app_data(std::string script,std::string pat,int select) :
basic_app_data(script,pat,select)
{
}
};

struct applications_pool::data {
std::vector<boost::shared_ptr<app_data> > apps;
typedef std::map<application *,boost::shared_ptr<long_running_app_data> > long_running_aps_type;
long_running_aps_type long_running_aps;
int limit;
boost::mutex mutex;
};
typedef boost::unique_lock<boost::mutex> lock_it;


applications_pool::applications_pool(service &srv,int limit) :
@@ -79,77 +102,128 @@ applications_pool::applications_pool(service &srv,int limit) :
applications_pool::~applications_pool()
{
}

std::string applications_pool::script_name()
{
return srv_->settings().str("service.default_script_name","*");
}

void applications_pool::mount(std::auto_ptr<factory> aps)
{
std::string script_name=srv_->settings().str("service.default_script_name","*");
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name,aps)));
lock_it lock(d->mutex);
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name(),aps)));
}
void applications_pool::mount(std::auto_ptr<factory> aps,std::string path_info,int select)
{
std::string script_name=srv_->settings().str("service.default_script_name","*");
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name,path_info,select,aps)));
lock_it lock(d->mutex);
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name(),path_info,select,aps)));
}
void applications_pool::mount(std::auto_ptr<factory> aps,std::string script_name)
{
lock_it lock(d->mutex);
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name,aps)));
}
void applications_pool::mount(std::auto_ptr<factory> aps,std::string script_name,std::string path_info,int select)
{
lock_it lock(d->mutex);
d->apps.push_back(boost::shared_ptr<app_data>(new app_data(script_name,path_info,select,aps)));
}

void applications_pool::mount(application *app)
{
lock_it lock(d->mutex);
d->long_running_aps[app]=boost::shared_ptr<long_running_app_data>(new long_running_app_data(script_name()));
}
void applications_pool::mount(application *app,std::string path_info,int select)
{
lock_it lock(d->mutex);
d->long_running_aps[app]=
boost::shared_ptr<long_running_app_data>(new long_running_app_data(script_name(),path_info,select));
}
void applications_pool::mount(application *app,std::string script_name)
{
lock_it lock(d->mutex);
d->long_running_aps[app]=boost::shared_ptr<long_running_app_data>(new long_running_app_data(script_name));
}
void applications_pool::mount(application *app,std::string script_name,std::string path_info,int select)
{
lock_it lock(d->mutex);
d->long_running_aps[app]=
boost::shared_ptr<long_running_app_data>(new long_running_app_data(script_name,path_info,select));
}

std::auto_ptr<application> applications_pool::get(std::string script_name,std::string path_info,std::string &matched)
bool applications_pool::matched(basic_app_data &data,std::string script_name,std::string path_info,std::string &matched)
{
for(unsigned i=0;i<d->apps.size();i++) {
std::string const expected_name=d->apps[i]->script_name;
if(expected_name!="*" && !expected_name.empty()) {
if(expected_name[0]=='/')
if(script_name!=expected_name)
continue;
else { // if(sn[0]=='.')
if( script_name.size() <= expected_name.size()
|| script_name.substr(script_name.size() - expected_name.size())!=expected_name)
{
continue;
}
std::string const expected_name=data.script_name;
if(expected_name!="*" && !expected_name.empty()) {
if(expected_name[0]=='/')
if(script_name!=expected_name)
return false;
else { // if(sn[0]=='.')
if( script_name.size() <= expected_name.size()
|| script_name.substr(script_name.size() - expected_name.size())!=expected_name)
{
return false;
}
}
else if(expected_name=="*" && script_name.empty())
continue;
else if(expected_name.empty() && !script_name.empty())
continue;
boost::cmatch match;
if(!d->apps[i]->use_regex) {
matched=path_info;
}
else if(boost::regex_match(path_info.c_str(),match,d->apps[i]->expr)) {
matched=match[d->apps[i]->match];
}
else {
}
else if(expected_name=="*" && script_name.empty())
return false;
else if(expected_name.empty() && !script_name.empty())
return false;
boost::cmatch match;
if(!data.use_regex) {
matched=path_info;
return true;
}
else if(boost::regex_match(path_info.c_str(),match,data.expr)) {
matched=match[data.match];
return true;
}
else {
return false;
}
}


intrusive_ptr<application> applications_pool::get(std::string script_name,std::string path_info,std::string &m)
{
lock_it lock(d->mutex);
for(unsigned i=0;i<d->apps.size();i++) {
if(!matched(*d->apps[i],script_name,path_info,m))
continue;
}

if(d->apps[i]->pool.empty()) {
std::auto_ptr<application> app=(*d->apps[i]->factory)(*srv_);
intrusive_ptr<application> app=(*d->apps[i]->factory)(*srv_).release();
app->pool_id(i);
return app;
}
d->apps[i]->size--;
std::auto_ptr<application> app(*(d->apps[i]->pool.begin()));
intrusive_ptr<application> app(*(d->apps[i]->pool.begin()));
d->apps[i]->pool.erase(app.get());
return app;
}
return std::auto_ptr<application>();
for(data::long_running_aps_type::iterator p=d->long_running_aps.begin();p!=d->long_running_aps.end();++p){
if(!matched(*p->second,script_name,path_info,m))
continue;
intrusive_ptr<application> app=p->first;
}
return 0;
}

void applications_pool::put(std::auto_ptr<application> app)
void applications_pool::put(application *app)
{
unsigned id=app->pool_id();
if(id >= d->apps.size() || d->apps[id]->size >= d->limit)
lock_it lock(d->mutex);
if(!app) return;
int id=app->pool_id();
if(id < 0) {
d->long_running_aps.erase(app);
delete app;
}
if(unsigned(id) >= d->apps.size() || d->apps[id]->size >= d->limit)
return;
d->apps[id]->pool.insert(app.release());
d->apps[id]->pool.insert(app);
d->apps[id]->size++;
}



+ 14
- 3
applications_pool.h View File

@@ -4,6 +4,7 @@
#include "defs.h"
#include "noncopyable.h"
#include "hold_ptr.h"
#include "intrusive_ptr.h"

#include <memory>
#include <string>
@@ -26,15 +27,25 @@ namespace cppcms {
void mount(std::auto_ptr<factory> aps,std::string script_name);
void mount(std::auto_ptr<factory> aps,std::string script_name,std::string path_info, int select);

std::auto_ptr<application> get(std::string script_name,std::string path_info,std::string &match);
void put(std::auto_ptr<application> app);
void mount(application *app);
void mount(application *app,std::string path_info,int select);
void mount(application *app,std::string script_name);
void mount(application *app,std::string script_name,std::string path_info, int select);

intrusive_ptr<application> get(std::string script_name,std::string path_info,std::string &match);
void put(application *app);

applications_pool(service &srv,int pool_size_limit);
~applications_pool();

private:
service *srv_;
struct basic_app_data;
struct app_data;
struct long_running_app_data;
struct data;
std::string script_name();
bool matched(basic_app_data &data,std::string script_name,std::string path_info,std::string &matched);
service *srv_;
util::hold_ptr<data> d;
};



+ 1
- 1
config.txt View File

@@ -16,7 +16,7 @@ service.api = "http" # fastcgi -- preferred API
# such the fork()+exec() time in neligable
# Recomended for debug purposes only

http.script_names = { "/hello" }
http.script_names = { "/stock" "/hello" }

# Server work mode



+ 71
- 1
hello_world.cpp View File

@@ -8,10 +8,79 @@
#include "locale_environment.h"
#include "http_context.h"
#include "format.h"
#include "aio_timer.h"
#include <sstream>
#include <stdexcept>


class stock : public cppcms::application {
public:
stock(cppcms::service &srv) : cppcms::application(srv),timer_(srv)
{
dispatcher().assign("^/price$",&stock::get,this);
dispatcher().assign("^/update$",&stock::update,this);
price_=10.3;
counter_=0;
on_timeout(true);
}
private:
void on_timeout(bool x)
{
broadcast();
timer_.expires_from_now(10);
timer_.async_wait(cppcms::util::mem_bind(&stock::on_timeout,this));
}
void get()
{
response().set_plain_text_header();
cppcms::http::request::form_type::const_iterator p=request().get().find("from"),
e=request().get().end();
if(p==e || atoi(p->second.c_str()) < counter_) {
response().out() << price_<<std::endl;
release_context(last_assigned_context());
return;
}
all_.push_back(last_assigned_context());
}
void broadcast()
{
for(unsigned i=0;i<all_.size();i++) {
all_[i]->response().out() << counter_<<":"<<price_ << std::endl;
release_context(all_[i]);
}
all_.clear();
}
void update()
{
if(request().request_method()=="POST") {
cppcms::http::request::form_type::const_iterator p=request().post().find("price"),
e=request().post().end();
if(p!=e) {
price_ = atof(p->second.c_str());
counter_ ++ ;
for(unsigned i=0;i<all_.size();i++) {
all_[i]->response().out() << price_ << std::endl;
release_context(all_[i]);
}
all_.clear();
}
}
response().out() <<
"<html>"
"<body><form action='/stock/update' method='post'> "
"<input type='text' name='price' /><br/>"
"<input type='submit' value='Update Price' name='submit' />"
"</form></body></html>"<<std::endl;
release_context(last_assigned_context());
}

int counter_;
double price_;
std::vector<cppcms::http::context *> all_;
cppcms::aio::timer timer_;
};

class hello : public cppcms::application {
public:
hello(cppcms::service &srv) :
@@ -112,7 +181,8 @@ int main(int argc,char **argv)
{
try {
cppcms::service service(argc,argv);
service.applications_pool().mount(cppcms::applications_factory<hello>());
service.applications_pool().mount(cppcms::applications_factory<hello>(),"/hello");
service.applications_pool().mount(new stock(service),"/stock");
service.run();
std::cout<<"Done..."<<std::endl;
}


+ 8
- 6
http_context.cpp View File

@@ -49,11 +49,10 @@ void context::on_request_ready(bool error)
std::string script_name = conn_->getenv("SCRIPT_NAME");
std::string matched;

intrusive_ptr<application> app = service().applications_pool().get(script_name,path_info,matched).release();
intrusive_ptr<application> app = service().applications_pool().get(script_name,path_info,matched);

url_dispatcher::dispatch_type how;
bool make_404 = (app.get() == 0)
|| ((how=app->dispatcher().dispatchable(matched))==url_dispatcher::none);
bool make_404 = !app || ((how=app->dispatcher().dispatchable(matched))==url_dispatcher::none);

if(make_404) {
app=0;
@@ -64,11 +63,14 @@ void context::on_request_ready(bool error)
}

app->assign_context(self());
bool sync = how != url_dispatcher::asynchronous;
if(sync)
bool sync = !app->is_asynchronous() && (how != url_dispatcher::asynchronous);
if(sync) {
app->service().thread_pool().post(boost::bind(&context::dispatch,app,true));
else
}
else {
response().io_mode(http::response::asynchronous);
app->service().post(boost::bind(&context::dispatch,app,false));
}
}

/* static */


Loading…
Cancel
Save