Browse Source

Added incomplete yet pool test

master
Artyom Beilis 8 years ago
parent
commit
1030a181ea
6 changed files with 418 additions and 10 deletions
  1. +5
    -0
      CMakeLists.txt
  2. +5
    -9
      cppcms/applications_pool.h
  3. +31
    -1
      src/applications_pool.cpp
  4. +223
    -0
      tests/pool_test.cpp
  5. +12
    -0
      tests/pool_test.js
  6. +142
    -0
      tests/pool_test.py

+ 5
- 0
CMakeLists.txt View File

@@ -658,6 +658,7 @@ endif(DISABLE_SHARED)
set(ALL_TESTS
form_test
proto_test
pool_test
disco_test
http_timeouts_test
file_server_test
@@ -804,6 +805,10 @@ add_test(status_test
status_test "-c" "${CNF}/status_test.js"
"--test-exec=${PYTHON} ${CNF}/status_test.py")

#add_test(pool_test
# pool_test "-c" "${CNF}/pool_test.js"
# "--test-exec=${PYTHON} ${CNF}/pool_test.py")

add_test(async_status_test
status_test "-c" "${CNF}/status_test.js" "--test-async=async"
"--test-exec=${PYTHON} ${CNF}/status_test.py async")


+ 5
- 9
cppcms/applications_pool.h View File

@@ -50,6 +50,7 @@ namespace cppcms {
static const int op_mode_mask = 0x000F; /// mask to select sync vs async flags

static const int thread_specific= 0x0010; ///< Make synchronous application thread specific
static const int prepopulated = 0x0020; ///< Make sure all applications are created from the beginning (ignored in thread_specific is set)
/// \cond INTERNAL
static const int legacy = 0x8000; ///< Use legacy handling of application life time when the application is created in the event loop and than dispatched as a job to a thread pool
/// \endcond
@@ -76,6 +77,7 @@ namespace cppcms {
///
virtual application *new_application(service &srv) = 0;
private:
void prepopulate(cppcms::service &srv);
friend class applications_pool;
friend class http::context;
friend void booster::intrusive_ptr_release(cppcms::application *app);
@@ -256,7 +258,7 @@ namespace cppcms {
/// Create application factory for application of type T, such as T has a constructor
/// T::T(cppcms::service &s);
///
/// \deprecated Use applications_genrator
/// \deprecated Use create_pool
///
template<typename T>
std::auto_ptr<applications_pool::factory> applications_factory()
@@ -269,7 +271,7 @@ namespace cppcms {
/// Create application factory for application of type T, such as T has a constructor
/// T::T(cppcms::service &s,P1);
///
/// \deprecated Use applications_genrator
/// \deprecated Use create_pool
///
template<typename T,typename P1>
std::auto_ptr<applications_pool::factory> applications_factory(P1 p1)
@@ -282,7 +284,7 @@ namespace cppcms {
/// Create application factory for application of type T, such as T has a constructor
/// T::T(cppcms::service &s,P1,P2);
///
/// \deprecated Use applications_genrator
/// \deprecated Use create_pool
///
template<typename T,typename P1,typename P2>
std::auto_ptr<applications_pool::factory> applications_factory(P1 p1,P2 p2)
@@ -330,8 +332,6 @@ namespace cppcms {
/// Create application application_specific_pool for application of type T, such as T has a constructor
/// T::T(cppcms::service &s);
///
/// \deprecated Use applications_genrator
///
template<typename T>
booster::shared_ptr<application_specific_pool> create_pool()
{
@@ -343,8 +343,6 @@ namespace cppcms {
/// Create application application_specific_pool for application of type T, such as T has a constructor
/// T::T(cppcms::service &s,P1);
///
/// \deprecated Use applications_genrator
///
template<typename T,typename P1>
booster::shared_ptr<application_specific_pool> create_pool(P1 p1)
{
@@ -356,8 +354,6 @@ namespace cppcms {
/// Create application application_specific_pool for application of type T, such as T has a constructor
/// T::T(cppcms::service &s,P1,P2);
///
/// \deprecated Use applications_genrator
///
template<typename T,typename P1,typename P2>
booster::shared_ptr<application_specific_pool> create_pool(P1 p1,P2 p2)
{


+ 31
- 1
src/applications_pool.cpp View File

@@ -26,6 +26,7 @@ namespace cppcms {
class application_specific_pool::_policy {
public:
_policy(application_specific_pool *self) : self_(self) {}
virtual void prepopulate(cppcms::service &srv) = 0;
virtual ~_policy() {}
virtual booster::intrusive_ptr<application> get(cppcms::service &srv) = 0;
virtual void put(application *app) = 0;
@@ -51,6 +52,7 @@ public:
return;
tss_.reset(app);
}
virtual void prepopulate(cppcms::service &){}
private:
booster::thread_specific_ptr<application> tss_;
};
@@ -68,6 +70,15 @@ public:
for(size_t i=0;i<size_;i++)
delete apps_[i];
}
virtual void prepopulate(cppcms::service &srv)
{
if((self_->flags() & app::prepopulated) && !(self_->flags() & app::legacy)) {
while(size_ < apps_.size()) {
size_++;
apps_[size_-1]= get_new(srv);
}
}
}
virtual booster::intrusive_ptr<application> get(cppcms::service &srv)
{
if(size_ == 0)
@@ -92,7 +103,17 @@ private:

class application_specific_pool::_async_policy : public application_specific_pool::_policy{
public:
_async_policy(application_specific_pool *self) : _policy(self) {}
_async_policy(application_specific_pool *self) :
_policy(self)
{
}
virtual void prepopulate(cppcms::service &srv)
{
if((self_->flags() & app::prepopulated) && !(self_->flags() & app::legacy)) {
if(!app_)
app_ = get_new(srv);
}
}
virtual booster::intrusive_ptr<application> get(cppcms::service &srv)
{
if(!app_)
@@ -114,6 +135,7 @@ public:
app_(0)
{
}
virtual void prepopulate(cppcms::service &) {}
virtual booster::intrusive_ptr<application> get(cppcms::service &srv)
{
if(self_->flags()==-1)
@@ -224,6 +246,11 @@ booster::intrusive_ptr<application> application_specific_pool::get(cppcms::servi
return app;
}

void application_specific_pool::prepopulate(cppcms::service &srv)
{
d->policy->prepopulate(srv);
}

namespace impl {
class legacy_sync_pool : public application_specific_pool {
public:
@@ -294,6 +321,7 @@ void applications_pool::mount(std::auto_ptr<factory> aps,mount_point const &mp)
booster::unique_lock<booster::recursive_mutex> lock(d->lock);
d->apps.push_back(_data::attachment(p,mp));
}

void applications_pool::mount(std::auto_ptr<factory> aps)
{
mount(aps,mount_point());
@@ -318,6 +346,8 @@ void applications_pool::mount(booster::shared_ptr<application_specific_pool> gen
}
gen->size(d->thread_count);
gen->flags(flags);
if(flags & app::prepopulated)
gen->prepopulate(*srv_);
booster::unique_lock<booster::recursive_mutex> lock(d->lock);
d->apps.push_back(_data::attachment(gen,point));
}


+ 223
- 0
tests/pool_test.cpp View File

@@ -0,0 +1,223 @@
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (C) 2008-2012 Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
// See accompanying file COPYING.TXT file for licensing details.
//
///////////////////////////////////////////////////////////////////////////////
#include <cppcms/service.h>
#include <cppcms/application.h>
#include <cppcms/applications_pool.h>
#include <cppcms/http_request.h>
#include <cppcms/http_response.h>
#include <cppcms/http_context.h>
#include <cppcms/url_dispatcher.h>
#include <cppcms/mount_point.h>
#include <booster/aio/deadline_timer.h>
#include <booster/posix_time.h>
#include <cppcms/json.h>
#include <booster/thread.h>
#include <iostream>
#include "client.h"
#include <sstream>
#include <booster/log.h>


booster::thread_specific_ptr<int> g_thread_id;
booster::mutex g_id_lock;
int g_thread_id_counter=1000;

void set_thread_id(int v)
{
g_thread_id.reset(new int(v));
}

int get_thread_id()
{
if(g_thread_id.get()==0) {
booster::unique_lock<booster::mutex> guard(g_id_lock);
int new_id = ++g_thread_id_counter;
set_thread_id(new_id);
}
return *g_thread_id;
}

class counter {
typedef booster::unique_lock<booster::mutex> guard;
counter() :
current(0),
max(0),
total(0)
{
}
public:
static counter *instance(std::string const &name)
{
static const int max = 20;
static int curr;
static counter all[max];
for(int i=0;i<curr;i++)
if(all[i].name==name)
return all + i;
assert(curr < max);
all[curr].name = name;
return all + curr++;
}

void print(std::ostream &out)
{
guard g(lock_);
out<< "name="<<name<<"\n"
"current="<<current<<"\n"
"total="<<total<<"\n"
"max="<<max;

}

std::string name;
int current;
int max;
int total;

int inc() {
guard g(lock_);
int r = ++total;
current++;
if(max < current)
max = current;
return r;
}
void dec()
{
guard g(lock_);
current--;
}

private:
booster::mutex lock_;
};



class unit_test : public cppcms::application {
public:
unit_test(cppcms::service &s,counter *c) :
cppcms::application(s),
c_(c),
id_(0),
original_thread_id_(0)
{
id_ = c_->inc();
original_thread_id_ = get_thread_id();
}
~unit_test()
{
c_->dec();
}
void main(std::string path)
{
double sleep_for = atof(request().get("sleep").c_str());

BOOSTER_DEBUG("cppcms") << "--------- GOT " << path << " for " << sleep_for << " from " << id_ << " in " << get_thread_id();

booster::ptime::sleep(booster::ptime::from_number(sleep_for));


std::ostringstream ss;
ss <<
"url=" << request().script_name() << path << "\n"
"thread_id=" << get_thread_id() << "\n"
"original_thread_id=" << original_thread_id_ << "\n"
"app_id="<<id_;
BOOSTER_DEBUG("cppcms") << "RESPONSE " << path << " from " << id_ << "\n" << ss.str() ;
response().out() << ss.str();

}
private:
counter *c_;
int id_;
int original_thread_id_;
};

class tester : public cppcms::application {
public:
tester(cppcms::service &srv) : cppcms::application(srv) {}
void main(std::string name)
{
if(name=="/stats")
counter::instance(request().get("id"))->print(response().out());
else if(name=="/install") {
app_ = new unit_test(service(),counter::instance("/async/temporary"));
service().applications_pool().mount(app_,cppcms::mount_point("/async","/temporary",0));
}
else if(name=="/uninstall") {
app_ = 0;
}

}
private:
booster::intrusive_ptr<cppcms::application> app_;
};


int main(int argc,char **argv)
{
try {
using cppcms::mount_point;
cppcms::service srv(argc,argv);

set_thread_id(1);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync")),
mount_point("/sync","",0),
cppcms::app::synchronous);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/work")),
mount_point("/sync","/work",0),
cppcms::app::synchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/prepopulated")),
mount_point("/sync","/prepopulated",0),
cppcms::app::synchronous | cppcms::app::prepopulated);
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/sync/tss")),
mount_point("/sync","/tss",0),
cppcms::app::synchronous | cppcms::app::thread_specific);

srv.applications_pool().mount(
cppcms::applications_factory<unit_test>(counter::instance("/sync/legacy")),
mount_point("/sync","/legacy",0));
srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async")),
mount_point("/async","",0),
cppcms::app::asynchronous);

srv.applications_pool().mount(
cppcms::create_pool<unit_test>(counter::instance("/async/prepopulated")),
mount_point("/async","/prepopulated",0),
cppcms::app::asynchronous);

booster::intrusive_ptr<cppcms::application> app = new unit_test(srv,counter::instance("/async/legacy"));
srv.applications_pool().mount(
app,
mount_point("/async","/legacy",0));

counter::instance("/async/temporary");

srv.applications_pool().mount(cppcms::create_pool<tester>(),mount_point("/test"),cppcms::app::asynchronous);

srv.after_fork(submitter(srv));
srv.run();
}
catch(std::exception const &e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
return run_ok ? EXIT_SUCCESS : EXIT_FAILURE;
}

+ 12
- 0
tests/pool_test.js View File

@@ -0,0 +1,12 @@

{
"service" : {
"api" : "http",
"port" : 8080,
"ip" : "127.0.0.1",
"worker_threads" : 3 // 1 for ::system("python ...") and another 2 for applications, so effectively only 2 are active
},
"http" : {
"script_names" : [ "/test" , "/async" , "/sync" ]
}
}

+ 142
- 0
tests/pool_test.py View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python
# coding=utf-8
#
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
import httplib
import sys
import re
import time
import datetime
import socket

def test(x):
if not x:
raise RuntimeError("Failed")
def now():
return datetime.datetime.now().strftime("%H:%M:%S.%f")

class Conn():
num=re.compile('^[0-9]+$')
def __init__(self,path):
self.path = path
print now(),'GET',path
self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM);
self.s.connect(('127.0.0.1',8080))
self.s.send('GET ' + path + ' HTTP/1.0\r\n\r\n')
def get(self):
print now(),'READ ',self.path
response = ''
while True:
tmp=self.s.recv(1000)
if len(tmp) == 0:
self.s.close()
break
response = response + tmp
body = response.split('\r\n\r\n')[1]

#r=self.h.getresponse()
#body = r.read()
r={}
for s in body.split('\n'):
if s=='':
break
ss=s.split('=')
if Conn.num.match(ss[1]):
r[ss[0]]=int(ss[1])
else:
r[ss[0]]=ss[1]
print now(), "Got",r
return r

def pool_many(url,cb=None):
a=[None]*10
for i in xrange(0,len(a)):
a[i]=Conn(url)
for i in xrange(0,len(a)):
r=a[i].get()
if cb:
cb(r)
a[i]=None

def test_sync():
st=Conn('/test/stats?id=/sync').get()
test(st["total"]==0)

c1 = Conn('/sync?sleep=0.5') # T1
time.sleep(0.1)
c2 = Conn('/sync/work?sleep=1.0') # T2
time.sleep(0.1)
c3 = Conn('/sync/work?sleep=1.0') # T1
time.sleep(0.1)
r1 = c1.get()
test(r1['original_thread_id'] == r1['thread_id'])
test(r1['app_id']==1)
c4 = Conn('/sync')
r4 = c4.get()
test(r4['app_id']==1)
test(r4['thread_id']!=r4['original_thread_id'])
r2 = c2.get()
r3 = c3.get()

pool_many('/sync?sleep=0.2')
st=Conn('/test/stats?id=/sync').get()
test(st["total"]==2)
test(st["current"]==2)


test_sync()

def test_sync_prep():
n='/sync/prepopulated'
st=Conn('/test/stats?id=' + n).get()
test(st["total"]==2)

c1 = Conn(n+'?sleep=0.2')
c2 = Conn(n+'?sleep=0.2')
r1=c1.get()
r2=c2.get()
test(r1["app_id"]!=r2["app_id"])
test(r1["original_thread_id"]==1)
test(r2["original_thread_id"]==1)
test(r1["thread_id"]!=r2["thread_id"])
test(r1["thread_id"] >= 1000)
test(r2["thread_id"] >= 1000)

pool_many(n+'?sleep=0.2')
st=Conn('/test/stats?id=' + n).get()
test(st["total"]==2)
test(st["current"]==2)

test_sync_prep()


def test_sync_ts():
st=Conn('/test/stats?id=/sync/tss').get()
test(st["total"]==0)

c1 = Conn('/sync/tss?sleep=0.5') # T1
time.sleep(0.1)
c2 = Conn('/sync/work?sleep=1.0') # T2
time.sleep(0.1)
c3 = Conn('/sync/work?sleep=1.0') # T1
time.sleep(0.1)
r1 = c1.get()
test(r1['original_thread_id'] == r1['thread_id'])
test(r1['app_id']==1)
c4 = Conn('/sync/tss')
r4 = c4.get()
test(r4['app_id']==2)
test(r4['thread_id']==r4['original_thread_id'])
r2 = c2.get()
r3 = c3.get()

def cb(r):
test(r['thread_id']==r['original_thread_id'])

pool_many('/sync/tss?sleep=0.2',cb)
st=Conn('/test/stats?id=/sync/tss').get()
test(st["total"]==2)
test(st["current"]==2)

test_sync_ts()

Loading…
Cancel
Save