Browse Source

- Added several server side storages...

- Not tested yet
master
Artyom Beilis 12 years ago
parent
commit
f5bd490d1c
3 changed files with 905 additions and 0 deletions
  1. +253
    -0
      contrib/server_side/sessions/berkeley_db/bdb.cpp
  2. +274
    -0
      contrib/server_side/sessions/cppdb/cppdb_storage.cpp
  3. +378
    -0
      contrib/server_side/sessions/sqlite3/session_sqlite_storage.cpp

+ 253
- 0
contrib/server_side/sessions/berkeley_db/bdb.cpp View File

@@ -0,0 +1,253 @@
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <db.h>

#include <booster/posix_time.h>
#include <booster/thread.h>
#include <cppcms/session_storage.h>
#include <cppcms/json.h>
#include <memory>
#include <iostream>
#include <sstream>
#include <fstream>
#include <unistd.h>



extern "C" {
static int get_key(DB *, const DBT *, const DBT *pdata, DBT *skey)
{
skey->data = pdata->data;
skey->size = 8;
return 0;
}
}

class single_storage
{
public:
void throw_error(int r)
{
throw std::runtime_error("Berkeley DB CppCMS storage: " + std::string(db_strerror(r)));
}
void check(int r,char const *op)
{
if(r!=0) {
throw std::runtime_error(std::string("Berkeley DB CppCMS storage:") + op + db_strerror(r));
}
}
single_storage(std::string const &directory) : env(0),dbp(0),dbp_to(0)
{
try {
check(db_env_create(&env,0),"db_env_create");
check(env->open(env,directory.c_str(),DB_INIT_CDB | DB_INIT_MPOOL | DB_CREATE,0666),"db_env::open");
check(db_create(&dbp, env, 0),"db_create");
check(dbp->open(dbp, NULL, "sid.db", NULL, DB_HASH, DB_CREATE, 0666),"db::open");
check(db_create(&dbp_to, env, 0),"db_create");
check(dbp_to->set_flags(dbp_to, DB_DUP | DB_DUPSORT),"db::set_flags");
check(dbp_to->open(dbp_to,NULL,"timeouts.db",NULL,DB_BTREE,DB_CREATE,0666),"db::open");
check(dbp->associate(dbp, NULL, dbp_to, get_key, 0),"db::associate");
}
catch(...) {
if(dbp_to)
dbp_to->close(dbp_to,0);
if(dbp)
dbp->close(dbp,0);
if(env)
env->close(env,0);
}
}

void save(std::string const &sid,time_t timeout,std::string const &in)
{
DBT key, data;
memset(&key,0,sizeof(key));
memset(&data,0,sizeof(data));
key.data = const_cast<char *>(sid.c_str());
key.size = sid.size();
std::vector<char> d(8 + in.size());
int64_t be_time = to_big_endian(timeout);
memcpy(&d[0],&be_time,8);
memcpy(&d[0]+8,in.c_str(),in.size());
data.data = &d[0];
data.size = d.size();
int ret = dbp->put(dbp,NULL,&key,&data,0);
check(ret,"db::put");
}

bool load(std::string const &sid,time_t &timeout,std::string &out)
{
DBT key, data;
memset(&key,0,sizeof(key));
memset(&data,0,sizeof(data));
key.data = const_cast<char *>(sid.c_str());
key.size = sid.size();
data.flags = DB_DBT_MALLOC;
int ret = dbp->get(dbp,NULL,&key,&data,0);
if(ret == DB_NOTFOUND)
return false;
check(ret,"db::get");
int64_t be_time;
memcpy(&be_time,data.data,8);
time_t to = to_big_endian(be_time);
if(to < time(0)) {
free(data.data);
return false;
}
timeout = to;
out.assign((char *)(data.data)+8,data.size - 8);
free(data.data);
return true;
}
void remove(std::string const &sid)
{
DBT key;
memset(&key,0,sizeof(key));
key.data = const_cast<char *>(sid.c_str());
key.size = sid.size();
int r = dbp->del(dbp,NULL,&key,0);
if(r== DB_NOTFOUND)
return;
check(r,"db::del");
}

void gc()
{
DBC *cur = 0;
check(dbp_to->cursor(dbp_to,0,&cur,DB_WRITECURSOR),"db::cursor");
for(;;) {
DBT data;
memset(&data,0,sizeof(data));
data.flags = DB_DBT_MALLOC;
int ret = cur->get(cur,0,&data,DB_FIRST);
if(ret == DB_NOTFOUND)
break;
if(ret!=0) {
cur->close(cur);
check(ret,"dbc::get");
}
int64_t be_time;
memcpy(&be_time,data.data,8);
free(data.data);
time_t to = to_big_endian(be_time);
if(to < time(0))
break;
}

cur->close(cur);
cur = 0;
check(dbp->sync(dbp,0),"db::sync");
check(dbp_to->sync(dbp_to,0),"db::sync");
}

~single_storage()
{
dbp->close(dbp,0);
dbp_to->close(dbp_to,0);
env->close(env,0);
}
private:
static int64_t to_big_endian(int64_t val_in)
{
uint64_t val = val_in;
union { char c[8]; int64_t v; } u;
for(unsigned i=0;i<8;i++) {
u.c[i]=val >> 46;
val <<=8;
}
return u.v;
}
DB_ENV *env;
DB *dbp;
DB *dbp_to;
};

class bdb_storage : public cppcms::sessions::session_storage {
public:
void save(std::string const &sid,time_t timeout,std::string const &in)
{
ptr()->save(sid,timeout,in);
}
bool load(std::string const &sid,time_t &timeout,std::string &out)
{
return ptr()->load(sid,timeout,out);
}
void remove(std::string const &sid)
{
return ptr()->remove(sid);
}
bool is_blocking()
{
return true;
}
void gc()
{
ptr()->gc();
}
bdb_storage(std::string const &dir) : dir_(dir)
{
}
private:
single_storage *ptr()
{
single_storage *p = ptr_.get();
if(!p) {
p=new single_storage(dir_);
ptr_.reset(p);
}
return p;
}
std::string dir_;
booster::thread_specific_ptr<single_storage> ptr_;
};

///
/// \brief The factory is an interface to a factory that creates session_storage objects, it should be thread safe.
///
class bdb_factory : public cppcms::sessions::session_storage_factory {
public:
virtual booster::shared_ptr<cppcms::sessions::session_storage> get()
{
return storage_;
}

///
/// Return true if session_storage requires garbage collection - removal of expired session time-to-time
///
virtual bool requires_gc()
{
return false;
}
///
/// Actual garbage collection job (if required). If requires_gc returns true it will be called once-in-a-while to remove
/// all expired objects from the DB.
///
virtual void gc_job()
{

storage_->gc();
}
bdb_factory(std::string const &dir)
{
storage_.reset(new bdb_storage(dir));
}
private:
booster::shared_ptr<bdb_storage> storage_;
};


extern "C" {
cppcms::sessions::session_storage_factory *session_factory(cppcms::json::value const &v)
{
std::string dir = v.get<std::string>("directory");
return new bdb_factory(dir);
}
}


+ 274
- 0
contrib/server_side/sessions/cppdb/cppdb_storage.cpp View File

@@ -0,0 +1,274 @@
#include <cppdb/frontend.h>
#include <cppcms/session_storage.h>
#include <cppcms/json.h>
#include <sstream>
#include <time.h>

namespace { // anon

class cppdb_storage : public cppcms::sessions::session_storage {
public:

enum engine_type {
mysql,
sqlite3,
postgresql
};

enum transactivity {
acid,
relaxed,
non_durable
};


void set_session_option(cppdb::session &sql)
{
switch(engine_) {
case sqlite3:
switch(transaction_mode_) {
case relaxed:
sql << "PRAGMA synchronous = NORMAL" << cppdb::exec;
break;
case non_durable:
sql << "PRAGMA synchronous = OFF" << cppdb::exec;
break;
default:
;
}
break;
case postgresql:
switch(transaction_mode_) {
case relaxed:
case non_durable:
sql << "SET SESSION synchronous_commit OFF" << cppdb::exec;
break;
default:
;
}
default:
;
}
}

void save(std::string const &sid,time_t timeout,std::string const &in)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
std::istringstream ss(in);
std::istream &si = ss;
switch(engine_) {
case mysql:
case sqlite3:
sql << "REPLACE INTO cppdb_sessions "
"VALUES(?,?,?) "
<< sid
<< timeout
<< si << cppdb::exec;
break;
case postgresql:
{
cppdb::transaction tr(sql);
sql << "DELETE FROM cppdb_sessions WHERE sid=?" << sid << cppdb::exec;
sql << "INSERT INTO cppdb_sessions "
"VALUES(?,?,?) " << sid << timeout << si << cppdb::exec;
tr.commit();
}
}
}

bool load(std::string const &sid,time_t &timeout,std::string &out)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
cppdb::result r;
std::ostringstream ss;
std::ostream &os=ss;
r=sql<< "SELECT timeout,content FROM cppdb_sessions "
"WHERE sid = ?" << sid << cppdb::row;
if(r.empty())
return false;
time_t to;
r >> to;
if(to < time(0)) {
return false;
}
r >> os;
r.clear();
out = ss.str();
timeout = to;
return true;
}
virtual void remove(std::string const &sid)
{
cppdb::session sql(conn_str_);
set_session_option(sql);
sql << "DELETE FROM cppdb_sessions WHERE sid = ?"
<< sid <<cppdb::exec;
}

void gc()
{
cppdb::session sql(conn_str_);
set_session_option(sql);
sql << "DELETE FROM sessions "
"WHERE sid in "
"("
" SELECT sid FROM sessions "
" WHERE timeout < ?"
")" << cppdb::exec;
}

cppdb_storage(cppcms::json::value const &val)
{
conn_str_ = val.get<std::string>("connection_string");
std::string mode = val.get("transactivity","acid");

if(mode == "acid")
transaction_mode_ = acid;
else if(mode == "relaxed")
transaction_mode_ = relaxed;
else if(mode == "non_durable")
transaction_mode_ = non_durable;
else
throw std::runtime_error("session-cppdb-storage: unsupported transactivity=`"+mode + "' "
"valid values are acid, relaxed and non_durable");
std::string engine;

cppdb::session sql(conn_str_);
std::string driver = sql.engine();
if(driver == "mysql")
engine_ = mysql;
else if(driver == "sqlite3")
engine_ = sqlite3;
else if(driver == "postgresql")
engine_ = postgresql;
else
throw std::runtime_error("session-cppdb-storage: unsupported driver:" + driver);

switch(engine_) {
case sqlite3:
{
sql << "CREATE TABLE IF NOT EXISTS sessions ("
" id varchar(32) primary key not null, "
" timeout bigint not null, "
" content blob non null "
")" << cppdb::exec;
sql << "CREATE INDEX IF NOT EXISTS "
"sessions_timeout on sessions(timeout)" << cppdb::exec;
std::string ver;
sql << "SELECT sqlite_version()" << cppdb::row >> ver;
size_t pos = ver.find('.');
if(pos!=std::string::npos)
pos = ver.find('.',pos+1);
if(pos!=std::string::npos)
ver.resize(pos);
if(atof(ver.c_str()) > 3.699999) {
sql << "PRAGMA journal_mode = WAL" << cppdb::row;
}
switch(transaction_mode_) {
case relaxed:
sql << "PRAGMA synchronous=NORMAL" << cppdb::exec;
break;
case non_durable:
sql << "PRAGMA synchronous=OFF" << cppdb::exec;
break;
default:
;
}
}
break;
case mysql:
{
switch(transaction_mode_) {
case acid:
sql << "CREATE TABLE IF NOT EXISTS sessions ("
" id varchar(32) primary key not null, "
" timeout bigint not null, "
" content blob non null "
") Engine=InnoDB" << cppdb::exec;
break;
case relaxed:
case non_durable:
sql << "CREATE TABLE IF NOT EXISTS sessions ("
" id varchar(32) primary key not null, "
" timeout bigint not null, "
" content blob non null "
") Engine=MyISAM" << cppdb::exec;
break;
}
sql << "CREATE INDEX IF NOT EXISTS "
"sessions_timeout on sessions(timeout)" << cppdb::exec;
}
case postgresql:
{
cppdb::transaction tr(sql);
cppdb::result r = sql
<< "SELECT 1 FROM pg_tables "
"WHERE tablename = 'sessions' "
<<cppdb::row;
if(r.empty()) {
sql << "CREATE TABLE sessions ("
" id varchar(32) primary key not null, "
" timeout bigint not null, "
" content bytea non null "
")" << cppdb::exec;
sql << "CREATE INDEX IF NOT EXISTS "
"sessions_timeout on sessions(timeout)" << cppdb::exec;
}
tr.commit();
}
break;
}
}
bool is_blocking()
{
return true;
}
private:

std::string conn_str_;
transactivity transaction_mode_;
engine_type engine_;
};


class cppdb_factory : public cppcms::sessions::session_storage_factory {
public:
cppdb_factory(cppcms::json::value const &v)
{
storage_.reset(new cppdb_storage(v));
}
virtual booster::shared_ptr<cppcms::sessions::session_storage> get()
{
return storage_;
}

virtual bool requires_gc()
{
return true;
}
virtual void gc_job()
{
storage_->gc();
}
private:
booster::shared_ptr<cppdb_storage> storage_;
};

} // anon
#if defined(CPPCMS_WIN32)
# define STORAGE_API declspec(__dllexport)
#else
# define STORAGE_API
#endif

extern "C" {
STORAGE_API cppcms::sessions::session_storage_factory *sessions_generator(cppcms::json::value const &options)
{
return new cppdb_factory(options);
}
}



+ 378
- 0
contrib/server_side/sessions/sqlite3/session_sqlite_storage.cpp View File

@@ -0,0 +1,378 @@
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (C) 2008-2010 Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
///////////////////////////////////////////////////////////////////////////////
#include <cppcms/config.h>
#include <cppcms/session_storage.h>
#include <booster/thread.h>
#include <booster/backtrace.h>
#include <sstream>
#include <cppcms/json.h>
#include <iostream>
#include <map>
#include <time.h>

#include <sqlite3.h>

namespace {

class sql_object {
public:
struct st_guard {
st_guard(sqlite3_stmt *st) : st_(st) {}
~st_guard()
{
sqlite3_clear_bindings(st_);
sqlite3_reset(st_);
}
private:
sqlite3_stmt *st_;
};
sql_object(std::string const &db_name) :
conn_(0),
replace_(0),
remove_(0),
cleanup_(0),
load_(0)
{
if(sqlite3_open(db_name.c_str(),&conn_)!=SQLITE_OK) {
if(conn_==0)
throw std::bad_alloc();
std::string msg = sqlite3_errmsg(conn_);
sqlite3_close(conn_);
throw booster::runtime_error(msg);
}
}
void begin()
{
exec("BEGIN");
}
void commit()
{
exec("COMMIT");
}
void prepare(sqlite3_stmt *&st,char const *stmt)
{
if(st)
return;
int r;
while((r=sqlite3_prepare_v2(conn_,stmt,-1,&st,0))==SQLITE_LOCKED || r==SQLITE_BUSY)
;
if(r!=SQLITE_OK)
throw_error();
}
void replace(std::string const &sid,time_t timeout,std::string const &data)
{
prepare(replace_,"REPLACE INTO sessions values(?,?,?)");
st_guard g(replace_);
check(sqlite3_bind_text(replace_,1,sid.c_str(),sid.size(),SQLITE_STATIC)==SQLITE_OK);
check(sqlite3_bind_int64(replace_,2,timeout)==SQLITE_OK);
check(sqlite3_bind_blob(replace_,3,data.c_str(),data.size(),SQLITE_STATIC)==SQLITE_OK);
check(sqlite3_step(replace_)==SQLITE_DONE);
}
void remove(std::string const &sid)
{
prepare(remove_,"DELETE FROM sessions WHERE sid=?");
st_guard g(remove_);
check(sqlite3_bind_text(remove_,1,sid.c_str(),sid.size(),SQLITE_STATIC)==SQLITE_OK);
check(sqlite3_step(remove_)==SQLITE_DONE);
}
void cleanup(time_t time,int limit)
{
prepare(cleanup_,
"DELETE FROM sessions "
"WHERE sid in "
"("
" SELECT sid FROM sessions "
" WHERE timeout < ? LIMIT ? "
")"
);
st_guard g(cleanup_);
check(sqlite3_bind_int64(cleanup_,1,time)==SQLITE_OK);
check(sqlite3_bind_int(cleanup_,2,limit)==SQLITE_OK);
check(sqlite3_step(cleanup_)==SQLITE_DONE);
}
bool load(std::string const &sid,time_t &to,std::string &value)
{
prepare(load_,"SELECT timeout,data FROM sessions WHERE sid=?");
st_guard g(load_);
check(sqlite3_bind_text(load_,1,sid.c_str(),sid.size(),SQLITE_STATIC)==SQLITE_OK);
for(;;) {
int r = sqlite3_step(load_);
if(r == SQLITE_DONE)
return false;
if(r == SQLITE_BUSY || r==SQLITE_LOCKED ) {
sqlite3_reset(load_);
continue;
}
if(r != SQLITE_ROW)
throw_error();
break;
}
time_t got_to = sqlite3_column_int64(load_,0);
if(got_to < time(0))
return false;
to = got_to;
char const *data = static_cast<char const *>(sqlite3_column_blob(load_,1));
size_t size = sqlite3_column_bytes(load_,1);
value.assign(data,size);
return true;
}
void exec(char const *stmt)
{
sqlite3_stmt *st = 0;
prepare(st,stmt);
int r = sqlite3_step(st);
sqlite3_finalize(st);
if(r!=SQLITE_DONE && r!=SQLITE_ROW) {
throw_error();
}
}
~sql_object()
{
finalize(replace_);
finalize(remove_);
finalize(cleanup_);
finalize(load_);
sqlite3_close(conn_);
}

private:
void check(bool c)
{
if(!c)
throw_error();
}
void finalize(sqlite3_stmt *&st)
{
if(st) {
sqlite3_finalize(st);
st = 0;
}
}
void throw_error()
{
throw booster::runtime_error(sqlite3_errmsg(conn_));
}

sqlite3 *conn_;
sqlite3_stmt *replace_;
sqlite3_stmt *remove_;
sqlite3_stmt *cleanup_;
sqlite3_stmt *load_;
};

class sql_session_storage : public cppcms::sessions::session_storage
{
struct data {
time_t timeout;
std::string value;
data() : timeout(0) {}
data(time_t t,std::string const &v) : timeout(t), value(v) {}
};

typedef std::map<std::string,data> data_type;
typedef booster::unique_lock<booster::shared_mutex> unique_guard;
typedef booster::shared_lock<booster::shared_mutex> shared_guard;

data_type data_;
booster::shared_mutex lock_;
data_type data_in_write_;
booster::shared_mutex data_in_write_lock_;
booster::thread_specific_ptr<sql_object> sql_;
std::string conn_;

public:
sql_session_storage(std::string const &conn) : conn_(conn)
{
}

virtual void save(std::string const &sid,time_t timeout,std::string const &in)
{
unique_guard g(lock_);
data_[sid]=data(timeout,in);
}

///
/// Load session with \a sid, put its end of life time to \a timeout and return its
/// value to \a out
///
virtual bool load(std::string const &sid,time_t &timeout,std::string &out)
{
shared_guard g(lock_);
data_type::iterator p=data_.find(sid);
if(p!=data_.end()) {
if(p->second.timeout < time(0))
return false;
timeout = p->second.timeout;
out = p->second.value;
return true;
}

{
shared_guard g2(data_in_write_lock_);

p=data_in_write_.find(sid);
if(p!=data_in_write_.end()) {
if(p->second.timeout < time(0))
return false;
timeout = p->second.timeout;
out = p->second.value;
return true;
}
}

if(sql_.get()==0) {
sql_.reset(new sql_object(conn_));
}
return sql_->load(sid,timeout,out);
}
virtual void remove(std::string const &sid)
{
unique_guard g(lock_);
data_[sid]=data();
}

virtual bool is_blocking()
{
return true;
}

void gc()
{
{
unique_guard g(lock_);
{
unique_guard g2(data_in_write_lock_);
data_in_write_.swap(data_);
}
}
{
shared_guard g2(data_in_write_lock_);
sql_object sql(conn_);
sql.begin();
try {
int count = 0;
for(data_type::iterator p=data_in_write_.begin();p!=data_in_write_.end();++p) {
if(p->second.timeout == 0)
sql.remove(p->first);
else
sql.replace(p->first,p->second.timeout,p->second.value);
count ++;
}
sql.cleanup(time(0),((count * 5) + 1000));
sql.commit();
}
catch(...) {
try { sql.exec("ROLLBACK"); }catch(...){}
throw;
}
}

{
unique_guard g2(data_in_write_lock_);
data_in_write_.clear();
}
}
private:
};


///
/// \brief The factory is an interface to a factory that creates session_storage objects, it should be thread safe.
///
class factory_object : public cppcms::sessions::session_storage_factory {
public:
factory_object(std::string const &file)
{
{
sql_object sql(file);
if(sqlite3_libversion_number() < (3*1000000 + 7*1000)) {
throw booster::runtime_error("Sqlite 3.7 and above required");
}
// Very Important
sql.exec("PRAGMA journal_mode = WAL");
sql.exec("CREATE TABLE IF NOT EXISTS "
"sessions ( "
" sid varchar(32) primary key not null,"
" timeout integer not null, "
" data blob not null"
")");
sql.exec("CREATE INDEX IF NOT EXISTS "
"sessions_timeout on sessions(timeout) ");
}
storage_.reset(new sql_session_storage(file));

}
///
/// Get a pointer to session_storage. Note if the returned pointer is same for different calls
/// session_storage implementation should be thread safe.
///
virtual booster::shared_ptr<cppcms::sessions::session_storage> get()
{
return storage_;
}

///
/// Return true if session_storage requires garbage collection - removal of expired session time-to-time
///
virtual bool requires_gc()
{
return true;
}
virtual void gc_job()
{
storage_->gc();
}
///
/// Delete the object, cleanup
///
virtual ~factory_object()
{
try {
storage_->gc();
}
catch(...) {}
}
private:
booster::shared_ptr<sql_session_storage> storage_;
};

} // anon

#if defined(CPPCMS_WIN32)
# define STORAGE_API declspec(__dllexport)
#else
# define STORAGE_API
#endif

extern "C" {
STORAGE_API cppcms::sessions::session_storage_factory *session_generator(cppcms::json::value const &opt)
{
return new factory_object(opt.get<std::string>("db"));
}
} // extern "C"


Loading…
Cancel
Save