Browse Source

Now legacy apps pool create only needed - up to threads no

Added tests to the unit-test list
master
Artyom Beilis 8 years ago
parent
commit
da0f373040
8 changed files with 207 additions and 95 deletions
  1. +3
    -3
      CMakeLists.txt
  2. +7
    -4
      cppcms/applications_pool.h
  3. +82
    -11
      src/applications_pool.cpp
  4. +4
    -19
      src/http_context.cpp
  5. +1
    -2
      src/service.cpp
  6. +77
    -50
      tests/pool_test.cpp
  7. +1
    -1
      tests/pool_test.js
  8. +32
    -5
      tests/pool_test.py

+ 3
- 3
CMakeLists.txt View File

@@ -805,9 +805,9 @@ add_test(status_test
status_test "-c" "${CNF}/status_test.js"
"--test-exec=${PYTHON} ${CNF}/status_test.py")

#add_test(pool_test
# pool_test "-c" "${CNF}/pool_test.js"
# "--test-exec=${PYTHON} ${CNF}/pool_test.py")
add_test(pool_test
pool_test "-c" "${CNF}/pool_test.js"
"--test-exec=${PYTHON} ${CNF}/pool_test.py")

add_test(async_status_test
status_test "-c" "${CNF}/status_test.js" "--test-async=async"


+ 7
- 4
cppcms/applications_pool.h View File

@@ -67,9 +67,6 @@ namespace cppcms {
application_specific_pool();
virtual ~application_specific_pool();

int flags();
void flags(int f);

protected:
///
/// Returns newly created instance of an application, its ownership
@@ -77,8 +74,13 @@ namespace cppcms {
///
virtual application *new_application(service &srv) = 0;
private:
int flags();
void flags(int f);

void prepopulate(cppcms::service &srv);
void application_requested(cppcms::service &srv);
friend class applications_pool;
friend class application;
friend class http::context;
friend void booster::intrusive_ptr_release(cppcms::application *app);

@@ -94,6 +96,7 @@ namespace cppcms {
class _pool_policy;
class _async_policy;
class _async_legacy_policy;
class _legacy_pool_policy;
booster::hold_ptr<_data> d;
};

@@ -205,7 +208,7 @@ namespace cppcms {

// put is not in use any more
void put(application *app);
applications_pool(service &srv,int pool_size_limit);
applications_pool(service &srv,int unused);
~applications_pool();

/// \endcond


+ 82
- 11
src/applications_pool.cpp View File

@@ -27,6 +27,7 @@ class application_specific_pool::_policy {
public:
_policy(application_specific_pool *self) : self_(self) {}
virtual void prepopulate(cppcms::service &srv) = 0;
virtual void application_requested(cppcms::service &) {}
virtual ~_policy() {}
virtual booster::intrusive_ptr<application> get(cppcms::service &srv) = 0;
virtual void put(application *app) = 0;
@@ -70,6 +71,7 @@ public:
for(size_t i=0;i<size_;i++)
delete apps_[i];
}
virtual void application_requested(cppcms::service &) {}
virtual void prepopulate(cppcms::service &srv)
{
if((self_->flags() & app::prepopulated) && !(self_->flags() & app::legacy)) {
@@ -101,6 +103,70 @@ private:
size_t size_;
};

class application_specific_pool::_legacy_pool_policy : public application_specific_pool::_policy {
public:
_legacy_pool_policy(application_specific_pool *self,size_t n) :
_policy(self),
total_(0),
pending_(0),
size_(0),
limit_(n)
{
apps_.resize(limit_,0);
}
~_legacy_pool_policy()
{
for(size_t i=0;i<size_;i++) {
delete apps_[i];
apps_[i]=0;
}
}
virtual void application_requested(cppcms::service &srv)
{
if(total_ >= limit_)
return;
pending_++;
if(pending_ > size_) {
apps_[size_] = get_new(srv);
size_++;
total_++;
}
}
virtual void prepopulate(cppcms::service &) {}
virtual booster::intrusive_ptr<application> get(cppcms::service &)
{
booster::intrusive_ptr<application> app;
// must never happen start
if(size_ == 0)
return app;
// must never happen end
size_--;
pending_--;
app = apps_[size_];
apps_[size_] = 0;
return app;
}
virtual void put(application *app)
{
if(!app)
return;
// must never heppen start
if(size_ >= limit_)
delete app;
// must never happend end
apps_[size_++] = app;
}
private:
std::vector<application *> apps_;
size_t total_;
size_t pending_;
size_t size_;
size_t limit_;
};




class application_specific_pool::_async_policy : public application_specific_pool::_policy{
public:
_async_policy(application_specific_pool *self) :
@@ -164,6 +230,16 @@ struct application_specific_pool::_data {
booster::recursive_mutex lock;
};

void application_specific_pool::size(size_t s)
{
d->size = s;
}

void application_specific_pool::application_requested(cppcms::service &srv)
{
d->policy->application_requested(srv);
}

application_specific_pool::~application_specific_pool()
{
}
@@ -171,7 +247,6 @@ application_specific_pool::~application_specific_pool()
application_specific_pool::application_specific_pool() : d(new application_specific_pool::_data())
{
d->flags = 0;
d->size = 0;
}

int application_specific_pool::flags()
@@ -193,7 +268,7 @@ void application_specific_pool::flags(int flags)
}

if(flags == app::legacy) {
d->policy.reset(new _pool_policy(this,d->size));
d->policy.reset(new _legacy_pool_policy(this,d->size));
return;
}

@@ -209,11 +284,6 @@ void application_specific_pool::flags(int flags)
}
}

void application_specific_pool::size(size_t n)
{
d->size = n;
}

application *application_specific_pool::get_new(service &srv)
{
application *a = new_application(srv);
@@ -294,17 +364,15 @@ struct applications_pool::_data {
};
std::list<attachment> apps;
std::list<attachment> legacy_async_apps;
int legacy_limit;
int thread_count;
booster::recursive_mutex lock;
};


applications_pool::applications_pool(service &srv,int pool_size_limit) :
applications_pool::applications_pool(service &srv,int /*unused*/) :
srv_(&srv),
d(new applications_pool::_data())
{
d->legacy_limit=pool_size_limit;
d->thread_count = srv_->threads_no();
}
applications_pool::~applications_pool()
@@ -315,7 +383,7 @@ applications_pool::~applications_pool()
void applications_pool::mount(std::auto_ptr<factory> aps,mount_point const &mp)
{
booster::shared_ptr<application_specific_pool> p(new impl::legacy_sync_pool(aps));
p->size(d->legacy_limit);
p->size(d->thread_count);
p->flags(app::legacy);

booster::unique_lock<booster::recursive_mutex> lock(d->lock);
@@ -333,6 +401,7 @@ void applications_pool::mount(booster::intrusive_ptr<application> app)
void applications_pool::mount(booster::intrusive_ptr<application> app,mount_point const &mp)
{
booster::shared_ptr<application_specific_pool> p(new impl::legacy_async_pool(app));
p->size(d->thread_count);
p->flags(app::legacy | app::asynchronous);

booster::unique_lock<booster::recursive_mutex> lock(d->lock);
@@ -367,6 +436,7 @@ applications_pool::get_application_specific_pool(char const *host,char const *sc
if(!m.first)
continue;
match = m.second;
it->pool->application_requested(*srv_);
return it->pool;
}
booster::shared_ptr<application_specific_pool> result;
@@ -381,6 +451,7 @@ applications_pool::get_application_specific_pool(char const *host,char const *sc
if(!m.first)
continue;
match = m.second;
app_it->pool->application_requested(*srv_);
result = app_it->pool;
}
}


+ 4
- 19
src/http_context.cpp View File

@@ -89,12 +89,6 @@ void context::run()
}

namespace {
struct dispatcher_legacy {
void (*func)(booster::intrusive_ptr<application> const &,std::string const &,bool);
booster::intrusive_ptr<application> app;
std::string url;
void operator()() { func(app,url,true); }
};
struct dispatcher {
void (*func)(booster::shared_ptr<application_specific_pool> const &,booster::shared_ptr<context> const &,std::string const &);
booster::shared_ptr<application_specific_pool> pool;
@@ -130,11 +124,12 @@ void context::on_request_ready(bool error)
return;
}

if(pool->flags() == (app::legacy | app::synchronous) || (pool->flags() & app::op_mode_mask)!=app::synchronous) {
// synchronous legacy
if((pool->flags() & app::op_mode_mask)!=app::synchronous) {
// asynchronous
booster::intrusive_ptr<application> app = pool->get(service());

if(!app) {
BOOSTER_ERROR("cppcms") << "Cound fetch asynchronous application from pool";
response().io_mode(http::response::asynchronous);
response().make_error_response(http::response::internal_server_error);
async_complete_response();
@@ -142,17 +137,6 @@ void context::on_request_ready(bool error)
}

app->assign_context(self());

if(pool->flags() == app::legacy) {
dispatcher_legacy dt;
dt.func = &context::dispatch;
dt.app = app;
dt.url.swap(matched);
app->service().thread_pool().post(dt);
return;
}
// Don't post, as context may be reassigned
response().io_mode(http::response::asynchronous);
dispatch(app,matched,false);
return;
@@ -192,6 +176,7 @@ void context::dispatch(booster::shared_ptr<application_specific_pool> const &poo
{
booster::intrusive_ptr<application> app = pool->get(self->service());
if(!app) {
BOOSTER_ERROR("cppcms") << "Cound fetch synchronous application from a pool";
self->response().make_error_response(http::response::internal_server_error);
self->complete_response();
return;


+ 1
- 2
src/service.cpp View File

@@ -225,8 +225,7 @@ void service::setup()
impl_->sig_.reset(new io::stream_socket(*impl_->io_service_));
impl_->breaker_.reset(new io::stream_socket(*impl_->io_service_));

int apps=settings().get("service.applications_pool_size",threads_no()*2);
impl_->applications_pool_.reset(new cppcms::applications_pool(*this,apps));
impl_->applications_pool_.reset(new cppcms::applications_pool(*this,0));
impl_->views_pool_.reset(new cppcms::views::manager(settings()));
impl_->cache_pool_.reset(new cppcms::cache_pool(settings()));
impl_->session_pool_.reset(new cppcms::session_pool(*this));


+ 77
- 50
tests/pool_test.cpp View File

@@ -21,12 +21,23 @@
#include "client.h"
#include <sstream>
#include <booster/log.h>
#include "test.h"

booster::thread_specific_ptr<int> g_thread_id;
booster::mutex g_id_lock;
int g_thread_id_counter=1000;

struct thread_submitter {
thread_submitter(cppcms::service &srv) : srv_(&srv) {}
void operator()() const
{
runner r(*srv_);
booster::thread t(r);
t.detach();
}
cppcms::service *srv_;
};

void set_thread_id(int v)
{
g_thread_id.reset(new int(v));
@@ -164,60 +175,76 @@ private:
int main(int argc,char **argv)
{
try {
using cppcms::mount_point;
cppcms::service srv(argc,argv);

set_thread_id(1);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync")),
mount_point("/sync","",0),
cppcms::app::synchronous);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/work")),
mount_point("/sync","/work",0),
cppcms::app::synchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/prepopulated")),
mount_point("/sync","/prepopulated",0),
cppcms::app::synchronous | cppcms::app::prepopulated);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/tss")),
mount_point("/sync","/tss",0),
cppcms::app::synchronous | cppcms::app::thread_specific);

srv.applications_pool().mount(
cppcms::applications_factory<unit_test>(counter::instance("/sync/legacy")),
mount_point("/sync","/legacy",0));
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async")),
mount_point("/async","",0),
cppcms::app::asynchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async/prepopulated")),
mount_point("/async","/prepopulated",0),
cppcms::app::asynchronous);

booster::intrusive_ptr<cppcms::application> app = new unit_test(srv,counter::instance("/async/legacy"));
srv.applications_pool().mount(
app,
mount_point("/async","/legacy",0));

counter::instance("/async/temporary");

srv.applications_pool().mount(cppcms::create_pool<tester>(),mount_point("/test"),cppcms::app::asynchronous);
{
using cppcms::mount_point;
cppcms::service srv(argc,argv);

set_thread_id(1);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync")),
mount_point("/sync","",0),
cppcms::app::synchronous);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/work")),
mount_point("/sync","/work",0),
cppcms::app::synchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/prepopulated")),
mount_point("/sync","/prepopulated",0),
cppcms::app::synchronous | cppcms::app::prepopulated);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/tss")),
mount_point("/sync","/tss",0),
cppcms::app::synchronous | cppcms::app::thread_specific);

srv.applications_pool().mount(
cppcms::applications_factory<unit_test>(counter::instance("/sync/legacy")),
mount_point("/sync","/legacy",0));
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async")),
mount_point("/async","",0),
cppcms::app::asynchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async/prepopulated")),
mount_point("/async","/prepopulated",0),
cppcms::app::asynchronous);

booster::intrusive_ptr<cppcms::application> app = new unit_test(srv,counter::instance("/async/legacy"));
srv.applications_pool().mount(
app,
mount_point("/async","/legacy",0));

counter::instance("/async/temporary");

srv.applications_pool().mount(cppcms::create_pool<tester>(),mount_point("/test"),cppcms::app::asynchronous);

srv.after_fork(thread_submitter(srv));
srv.run();
}

srv.after_fork(submitter(srv));
srv.run();
std::cout << "Test all deleted" << std::endl;
TEST(counter::instance("/sync")->current == 0);
TEST(counter::instance("/sync/prepopulated")->current == 0);
TEST(counter::instance("/sync/tss")->current == 0);
TEST(counter::instance("/sync/legacy")->current == 0);
TEST(counter::instance("/async")->current == 0);
TEST(counter::instance("/async/prepopulated")->current == 0);
TEST(counter::instance("/async/legacy")->current == 0);
std::cout << "Done" << std::endl;
}
catch(std::exception const &e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
if(run_ok)
std::cout << "Ok" << std::endl;
else
std::cout << "FAILED" << std::endl;
return run_ok ? EXIT_SUCCESS : EXIT_FAILURE;
}

+ 1
- 1
tests/pool_test.js View File

@@ -4,7 +4,7 @@
"api" : "http",
"port" : 8080,
"ip" : "127.0.0.1",
"worker_threads" : 3 // 1 for ::system("python ...") and another 2 for applications, so effectively only 2 are active
"worker_threads" : 2
},
"http" : {
"script_names" : [ "/test" , "/async" , "/sync" ]


+ 32
- 5
tests/pool_test.py View File

@@ -14,7 +14,7 @@ def test(x):
if not x:
raise RuntimeError("Failed")
def now():
return datetime.datetime.now().strftime("%H:%M:%S.%f")
return " " + datetime.datetime.now().strftime("%H:%M:%S.%f")

class Conn():
num=re.compile('^[0-9]+$')
@@ -35,8 +35,6 @@ class Conn():
response = response + tmp
body = response.split('\r\n\r\n')[1]

#r=self.h.getresponse()
#body = r.read()
r={}
for s in body.split('\n'):
if s=='':
@@ -60,6 +58,7 @@ def pool_many(url,cb=None):
a[i]=None

def test_sync():
print '/sync'
st=Conn('/test/stats?id=/sync').get()
test(st["total"]==0)

@@ -85,10 +84,10 @@ def test_sync():
test(st["current"]==2)


test_sync()

def test_sync_prep():
n='/sync/prepopulated'
print n
st=Conn('/test/stats?id=' + n).get()
test(st["total"]==2)

@@ -108,10 +107,10 @@ def test_sync_prep():
test(st["total"]==2)
test(st["current"]==2)

test_sync_prep()


def test_sync_ts():
print '/sync/tss'
st=Conn('/test/stats?id=/sync/tss').get()
test(st["total"]==0)

@@ -139,4 +138,32 @@ def test_sync_ts():
test(st["total"]==2)
test(st["current"]==2)



def test_sync_legacy():
n='/sync/legacy'
print n
st=Conn('/test/stats?id=' + n).get()
test(st["total"]==0)

c1 = Conn(n+'?sleep=0.2')
c2 = Conn(n+'?sleep=0.2')
r1=c1.get()
r2=c2.get()
test(r1["app_id"]!=r2["app_id"])
test(r1["original_thread_id"]==1)
test(r2["original_thread_id"]==1)
test(r1["thread_id"]!=r2["thread_id"])
test(r1["thread_id"] >= 1000)
test(r2["thread_id"] >= 1000)

pool_many(n+'?sleep=0.2')
st=Conn('/test/stats?id=' + n).get()
test(st["total"]==2)
test(st["current"]==2)

test_sync()
test_sync_prep()
test_sync_ts()
test_sync_legacy()
print "OK"

Loading…
Cancel
Save