Browse Source

- Fixed some DB Backends

- Improved testing - added automatic benchmarks support
master
Artyom Beilis 12 years ago
parent
commit
3d1c748b35
4 changed files with 212 additions and 93 deletions
  1. +9
    -16
      contrib/server_side/sessions/berkeley_db/bdb.cpp
  2. +51
    -40
      contrib/server_side/sessions/cppdb/cppdb_storage.cpp
  3. +142
    -31
      contrib/server_side/sessions/storage_test.cpp
  4. +10
    -6
      contrib/server_side/sessions/tests.js

+ 9
- 16
contrib/server_side/sessions/berkeley_db/bdb.cpp View File

@@ -44,7 +44,7 @@ public:
{
try {
check(db_env_create(&env,0),"db_env_create");
check(env->open(env,directory.c_str(),DB_INIT_CDB | DB_INIT_MPOOL | DB_CREATE,0666),"db_env::open");
check(env->open(env,directory.c_str(),DB_INIT_CDB | DB_INIT_MPOOL | DB_CREATE,0666),"db_env::open");
check(db_create(&dbp, env, 0),"db_create");
check(dbp->open(dbp, NULL, "sid.db", NULL, DB_HASH, DB_CREATE, 0666),"db::open");
check(db_create(&dbp_to, env, 0),"db_create");
@@ -122,39 +122,32 @@ public:
check(dbp_to->cursor(dbp_to,0,&cur,DB_WRITECURSOR),"db::cursor");
for(;;) {
DBT data;
DBT data,key;
memset(&data,0,sizeof(data));
data.flags = DB_DBT_MALLOC;
memset(&key,0,sizeof(key));
key.flags = DB_DBT_MALLOC;
#if DB_VERSION_MAJOR * 100 + DB_VERSION_MINOR >= 406
int ret = cur->get(cur,0,&data,DB_FIRST);
#else
int ret = cur->c_get(cur,0,&data,DB_FIRST);
#endif
int ret = cur->c_get(cur,&key,&data,DB_FIRST);
if(ret == DB_NOTFOUND)
break;
if(ret!=0) {
#if DB_VERSION_MAJOR * 100 + DB_VERSION_MINOR >= 406
cur->close(cur);
#else
cur->c_close(cur);
#endif
check(ret,"dbc::get");
}
int64_t be_time;
memcpy(&be_time,data.data,8);
free(data.data);
free(key.data);
time_t to = to_big_endian(be_time);
if(to < time(0))
if(to > time(0))
break;
check(cur->c_del(cur,0),"dbc::del");
}

#if DB_VERSION_MAJOR * 100 + DB_VERSION_MINOR >= 406
cur->close(cur);
#else
cur->c_close(cur);
#endif
cur = 0;
check(dbp->sync(dbp,0),"db::sync");


+ 51
- 40
contrib/server_side/sessions/cppdb/cppdb_storage.cpp View File

@@ -1,4 +1,6 @@
#include <cppdb/frontend.h>
#include <cppdb/backend.h>
#include <cppdb/pool.h>
#include <cppcms/session_storage.h>
#include <cppcms/json.h>
#include <sstream>
@@ -21,40 +23,48 @@ public:
non_durable
};

struct options_setter;
friend struct options_setter;

void set_session_option(cppdb::session &sql)
{
switch(engine_) {
case sqlite3:
switch(transaction_mode_) {
case relaxed:
sql << "PRAGMA synchronous = NORMAL" << cppdb::exec;
break;
case non_durable:
sql << "PRAGMA synchronous = OFF" << cppdb::exec;
break;
default:
;
}
break;
case postgresql:
switch(transaction_mode_) {
case relaxed:
case non_durable:
sql << "SET SESSION synchronous_commit = OFF" << cppdb::exec;
struct options_setter {
options_setter(cppdb_storage *p);
transactivity transaction_mode;
engine_type engine;
void operator()(cppdb::session &sql) const
{
switch(engine) {
case sqlite3:
switch(transaction_mode) {
case relaxed:
sql << "PRAGMA synchronous = NORMAL" << cppdb::exec;
break;
case non_durable:
sql << "PRAGMA synchronous = OFF" << cppdb::exec;
break;
default:
;
}
break;
case postgresql:
switch(transaction_mode) {
case relaxed:
case non_durable:
sql << "SET SESSION synchronous_commit = OFF" << cppdb::exec;
break;
default:
;
}
default:
;
}
default:
;
}
}

};


void save(std::string const &sid,time_t timeout,std::string const &in)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
cppdb::session sql(pool_->open(),options_setter(this));
std::istringstream ss(in);
std::istream &si = ss;
switch(engine_) {
@@ -79,8 +89,7 @@ public:

bool load(std::string const &sid,time_t &timeout,std::string &out)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
cppdb::session sql(pool_->open(),options_setter(this));
cppdb::result r;
std::ostringstream ss;
std::ostream &os=ss;
@@ -102,27 +111,21 @@ public:
virtual void remove(std::string const &sid)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
cppdb::session sql(pool_->open(),options_setter(this));
sql << "DELETE FROM cppdb_sessions WHERE sid = ?"
<< sid <<cppdb::exec;
}

void gc()
{
cppdb::session sql(conn_str_);
set_session_option(sql);
sql << "DELETE FROM cppdb_sessions "
"WHERE sid in "
"("
" SELECT sid FROM cppdb_sessions "
" WHERE timeout < ?"
")" << cppdb::exec;
cppdb::session sql(pool_->open(),options_setter(this));
sql << "DELETE FROM cppdb_sessions WHERE timeout < ?"
<< time(0) << cppdb::exec;
}

cppdb_storage(cppcms::json::value const &val)
{
conn_str_ = val.get<std::string>("connection_string");
std::string conn_str = val.get<std::string>("connection_string");
std::string mode = val.get("transactivity","acid");

if(mode == "acid")
@@ -136,7 +139,7 @@ public:
"valid values are acid, relaxed and non_durable");
std::string engine;

cppdb::session sql(conn_str_);
cppdb::session sql(conn_str);
std::string driver = sql.engine();
if(driver == "mysql")
engine_ = mysql;
@@ -222,6 +225,8 @@ public:
}
break;
}

pool_ = cppdb::pool::create(conn_str);
}
bool is_blocking()
{
@@ -229,11 +234,17 @@ public:
}
private:

std::string conn_str_;
cppdb::pool::pointer pool_;
transactivity transaction_mode_;
engine_type engine_;
};

cppdb_storage::options_setter::options_setter(cppdb_storage *p) :
transaction_mode(p->transaction_mode_),
engine(p->engine_)
{
}


class cppdb_factory : public cppcms::sessions::session_storage_factory {
public:


+ 142
- 31
contrib/server_side/sessions/storage_test.cpp View File

@@ -19,7 +19,10 @@
#include <cppcms/defs.h>
#include <stdexcept>
#include <sstream>
#include <booster/posix_time.h>
#include <booster/thread.h>
#include <cppcms/session_storage.h>
#include <cppcms/urandom.h>
#include <cppcms/json.h>
#include <booster/function.h>
#include <booster/shared_object.h>
@@ -27,10 +30,12 @@
#include <string.h>
#include <memory>
#include <iostream>
#include <fstream>
#include <vector>
#include <stdio.h>
#include <time.h>

#include <iomanip>
#include <stdio.h>

#define TEST(X) \
do { \
@@ -46,66 +51,173 @@ std::string bs="0123456789abcdef0123456789abcde";
void do_nothing() {}


void test(booster::shared_ptr<cppcms::sessions::session_storage> storage,booster::function<void()> callback=do_nothing)
void test(booster::shared_ptr<cppcms::sessions::session_storage> storage,cppcms::sessions::session_storage_factory &f)
{
f.gc_job();
time_t now=time(0)+3;
callback();
storage->save(bs+"1",now,"");
std::string out="xx";
time_t tout;
callback();
TEST(storage->load(bs+"1",tout,out));
TEST(out.empty());
TEST(tout==now);
callback();
storage->remove(bs+"1");
callback();
TEST(!storage->load(bs+"1",tout,out));
callback();
storage->save(bs+"1",now-4,"hello world");
callback();
TEST(!storage->load(bs+"1",tout,out));
callback();
storage->save(bs+"1",now,"hello world");
callback();
TEST(storage->load(bs+"1",tout,out));
callback();
TEST(out=="hello world");
storage->save(bs+"2",now,"x");
callback();
storage->remove(bs+"2");
callback();
TEST(storage->load(bs+"1",tout,out));
TEST(out=="hello world");
callback();
storage->remove(bs+"1");
callback();
storage->remove(bs+"2");
callback();
f.gc_job();
}


struct do_gc {
cppcms::sessions::session_storage_factory *f;
int n;
std::string get_sid()
{
cppcms::urandom_device dev;
unsigned char buf[16];
char sid[33];
dev.generate(buf,sizeof(buf));
static const char digits[17] = "0123456789abcdef";
for(int i=0;i<16;i++) {
sid[i*2] = digits[buf[i] >> 4];
sid[i*2 + 1] = digits[buf[i] & 0x0f];
}
sid[32]=0;
return sid;
}

std::string message = "this is some long long message";


booster::mutex total_inserted_lock;
int total_inserted;
booster::ptime end_point;

struct thread_runner{
thread_runner(cppcms::sessions::session_storage_factory &f) : fact_(&f)
{
expired_ = time(0) - 5;
}
void operator()() const
{
try {
int my_inserted = 0;
while(booster::ptime::now() < end_point) {
booster::shared_ptr<cppcms::sessions::session_storage> storage = fact_->get();
storage->save(get_sid(),expired_,message);
my_inserted ++;
}
{
booster::unique_lock<booster::mutex> g(total_inserted_lock);
total_inserted+=my_inserted;
}
}catch(std::exception const &e) {
fprintf(stderr,"Fail %s\n",e.what());
abort();
}
}
cppcms::sessions::session_storage_factory *fact_;
time_t expired_;
};

struct gc_runner{
gc_runner(cppcms::sessions::session_storage_factory &f) : fact_(&f)
{
}
void operator()() const
{
for(int i=0;i<n;i++) {
f->gc_job();
while(booster::ptime::now() < end_point) {
booster::ptime::sleep(booster::ptime::seconds(1));
fact_->gc_job();
}
}
cppcms::sessions::session_storage_factory *fact_;
};

int main()

void performance_test(cppcms::sessions::session_storage_factory &f,bool run_gc)
{
time_t expired = time(0) - 5;
booster::shared_ptr<cppcms::sessions::session_storage> storage = f.get();
booster::ptime start,end;

f.gc_job();
end = booster::ptime::now() + booster::ptime::from_number(1);
int count = 0;
for(count = 0;booster::ptime::now() < end ;count ++) {
f.get()->save(get_sid(),expired,message);
}
double insert_single_thread_us = 1.0 / count * 1e6;

start = booster::ptime::now();
f.gc_job();
end = booster::ptime::now();
double gc_time_us = booster::ptime::to_number(end - start) / count * 1e6;
std::vector<booster::shared_ptr<booster::thread> > threads;
int threads_no = 10;
double seconds_to_run = 5;
end_point = booster::ptime::now() + booster::ptime::from_number(seconds_to_run);
total_inserted = 0;
booster::shared_ptr<booster::thread> gc_thread;
for(int i=0;i<threads_no;i++) {
booster::shared_ptr<booster::thread> t;
t.reset(new booster::thread(thread_runner(f)));
threads.push_back(t);
}
if(run_gc)
gc_thread.reset(new booster::thread(gc_runner(f)));

for(int i=0;i<threads_no;i++)
threads[i]->join();
if(gc_thread)
gc_thread->join();

double insert_multiple_threads_us = seconds_to_run / total_inserted * 1e6;
std::cout << "Benchmarks: "<< std::endl;
std::cout << std::setprecision(1) << std::fixed;
std::cout << " Cleanup message/s: " << std::setw(10) << 1e6/gc_time_us << std::endl;
std::cout << " Inserts / s single thread: " << std::setw(10) << 1e6/insert_single_thread_us << std::endl;
std::cout << " Inserts / s multiple thread: " << std::setw(10) << 1e6/insert_multiple_threads_us << std::endl;
std::cout << " Cleanup for message (us): " << std::setw(10) << gc_time_us << std::endl;
std::cout << " Insert single thread (us): " << std::setw(10) << insert_single_thread_us << std::endl;
std::cout << " Insert multiple thread (us): " << std::setw(10) << insert_multiple_threads_us << std::endl;
}

int main(int argc,char **argv)
{
bool failed = false;
try {
cppcms::json::value v;
std::cin >> v;
if(argc==2) {
std::ifstream f(argv[1]);
if(!f) {
throw std::runtime_error("Failed to open input file");
}
f >> v;
if(!f) {
std::cerr<< "Parsing failed" << std::endl;
return 1;
}
}
else {
std::cin >> v;

if(!std::cin) {
std::cerr<< "Parsing failed" << std::endl;
return 1;
if(!std::cin) {
std::cerr<< "Parsing failed" << std::endl;
return 1;
}
}

cppcms::json::array &all=v.array();
@@ -114,6 +226,7 @@ int main()
for(size_t i=0;i<all.size() && !failed;i++) {
std::string so = v[i].get<std::string>("so");
std::string clean = v[i].get("clean","");
bool run_gc = v[i].get("run_gc",false);
if(!clean.empty()) {
system(clean.c_str());
}
@@ -127,12 +240,10 @@ int main()
try {
storage_factory.reset(gen(v[i]));
storage = storage_factory->get();
std::cout << "-- Without gc" << std::endl;
test(storage);
std::cout << "-- With gc" << std::endl;
do_gc gc = { storage_factory.get() };
test(storage,gc);
test(storage,*storage_factory);
std::cout << "-- Complete" << std::endl;
if(v[i].get("no_performance",false)==false)
performance_test(*storage_factory,run_gc);
}
catch(std::exception const &e) {
std::cerr << e.what() << std::endl;


+ 10
- 6
contrib/server_side/sessions/tests.js View File

@@ -4,6 +4,7 @@
"test" : "Sqlite3 native",
"db" : "cppdb.db",
"clean" : "rm cppdb.db",
"run_gc" : true
},
{
"so" : "./berkeley_db/libcppcms_session_bdb.so",
@@ -14,23 +15,26 @@
{
"so" : "./cppdb/libcppcms_session_cppdb.so",
"test" : "sqlite3 acid",
"connection_string" : "sqlite3:db=cppdb.db",
"connection_string" : "sqlite3:db=cppdb.db;busy_timeout=10000",
"clean" : "rm cppdb.db",
"transactivity" : "acid"
"transactivity" : "acid",
"no_performance" : true,
},
{
"so" : "./cppdb/libcppcms_session_cppdb.so",
"test" : "sqlite3 relaxed ",
"connection_string" : "sqlite3:db=cppdb.db",
"connection_string" : "sqlite3:db=cppdb.db;busy_timeout=10000",
"clean" : "rm cppdb.db",
"transactivity" : "relaxed"
"transactivity" : "relaxed",
"no_performance" : true,
},
{
"so" : "./cppdb/libcppcms_session_cppdb.so",
"test" : "sqlite3 non_durable ",
"connection_string" : "sqlite3:db=cppdb.db",
"connection_string" : "sqlite3:db=cppdb.db;busy_timeout=10000",
"clean" : "rm cppdb.db",
"transactivity" : "non_durable"
"transactivity" : "non_durable",
"no_performance" : true,
},
{
"so" : "./cppdb/libcppcms_session_cppdb.so",


Loading…
Cancel
Save