Browse Source

Disabled prefork-support (meanwhile)

master
Artyom Beilis 14 years ago
parent
commit
449546ccc6
2 changed files with 423 additions and 0 deletions
  1. +420
    -0
      acceptor.patch
  2. +3
    -0
      service.cpp

+ 420
- 0
acceptor.patch View File

@@ -0,0 +1,420 @@
Index: proc_acceptor.cpp
===================================================================
--- proc_acceptor.cpp (revision 1153)
+++ proc_acceptor.cpp (working copy)
@@ -1,5 +1,6 @@
#define CPPCMS_SOURCE
#include "proc_acceptor.h"
+#include <iostream>
namespace cppcms { namespace impl {
process_shared_acceptor::process_shared_acceptor(cppcms::service &srv)
@@ -12,15 +13,30 @@
service_=&srv.impl().get_io_service();
srv.after_fork(boost::bind(&process_shared_acceptor::on_fork,this));
}
+
+ void process_shared_acceptor::start()
+ {
+ worker_.reset(new boost::thread(boost::bind(&process_shared_acceptor::run,this)));
+ }
+
+ void process_shared_acceptor::stop()
+ {
+ stop_ = true;
+ wake();
+ worker_->join();
+ }
void process_shared_acceptor::on_fork()
{
int fds[2];
- pipe(fds);
+ if(::pipe(fds) < 0) {
+ throw cppcms_error(errno,"Creation of pipe failed");
+ }
break_fd_=fds[0];
wake_fd_=fds[1];
max_1_=std::max(break_fd_,wake_fd_)+1;
FD_SET(break_fd_,&wait_set);
+ start();
}
void process_shared_acceptor::wake()
{
@@ -32,6 +48,26 @@
}
}
+ struct assigner_and_caller {
+ function<void(int)> assigner;
+ function<void(boost::system::error_code const &e)> callback;
+ int fd;
+ boost::system::error_code e;
+ void operator()()
+ {
+ std::cout << "Assigning and calling" << std::endl;
+ try {
+ if(!assigner.empty())
+ assigner(fd);
+ callback(e);
+ }
+ catch(std::exception const &e)
+ {
+ std::cerr << "Assigning and calling failed " << e.what() << std::endl;
+ }
+ }
+ };
+
bool process_shared_acceptor::run_one()
{
fd_set rd;
@@ -42,7 +78,9 @@
int n;
{
mutex::guard guard(process_mutex_);
+ std::cout << "Selecting " << getpid() << std::endl;
n=::select(max_1_,&rd,0,0,0);
+ std::cout << "Selected " << n << std::endl;
if(n < 0 && errno==EINTR)
return true;
if(n < 0)
@@ -63,19 +101,22 @@
int fd=::accept(afd,0,0);
int err=errno;
+ std::cout << "Accepted " << afd << " as " << fd << " in " << getpid() <<std::endl;
+
thread_guard tguard(thread_mutex_);
- boost::system::error_code e;
- if(fd > 0) {
- assigners_[afd](fd);
- assigners_[afd]=assign_function();
+ assigner_and_caller anc;
+ if(fd >= 0) {
+ anc.assigner.swap(assigners_[afd]);
+ anc.fd=fd;
}
else {
- e=boost::system::error_code(err,boost::system::errno_ecat);
+ anc.e=boost::system::error_code(err,boost::system::errno_ecat);
}
- service_->post(boost::bind(callbacks_[afd],e));
- callbacks_[afd]=on_accept_function();
+ anc.callback.swap(callbacks_[afd]);
FD_CLR(afd,&wait_set);
+ std::cout << "Posting " << std::endl;
+ service_->post(anc);
}
}
}
@@ -83,8 +124,10 @@
}
void process_shared_acceptor::run()
{
+ std::cout << "Running " << getpid() << std::endl;
while(run_one())
;
+ std::cout << "Done " << getpid() << std::endl;
}
process_shared_acceptor::~process_shared_acceptor()
Index: service.cpp
===================================================================
--- service.cpp (revision 1153)
+++ service.cpp (working copy)
@@ -49,7 +49,11 @@
#include "views_pool.h"
#include "session_pool.h"
+#ifndef CPPCMS_WIN32
+#include "proc_acceptor.h"
+#endif
+
#ifdef CPPCMS_POSIX
#include <sys/wait.h>
#endif
@@ -304,6 +308,17 @@
#endif
}
+void service::reset_service()
+{
+ #ifndef CPPCMS_WIN32
+ std::auto_ptr<boost::asio::io_service> new_service(new boost::asio::io_service());
+ for(unsigned i=0;i<impl_->acceptors_.size();i++)
+ impl_->acceptors_[i]=impl_->acceptors_[i]->clone(*new_service);
+ impl_->io_service_ = new_service;
+
+ #endif
+}
+
void service::after_fork(function<void()> const &cb)
{
impl_->on_fork_.push_back(cb);
@@ -313,27 +328,46 @@
{
generator();
session_pool().init();
+
+ #ifndef CPPCMS_WIN32
+ //if(procs_no() > 1) {
+ impl_->process_shared_acceptor_.reset(new impl::process_shared_acceptor(*this));
+ //}
+ #endif
+
start_acceptor();
if(settings().get("file_server.enable",false))
applications_pool().mount(applications_factory<cppcms::impl::file_server>(),"");
+ std::cout << "FORKING" << std::endl;
+
if(prefork()) {
return;
}
thread_pool(); // make sure we start it
+
for(unsigned i=0;i<impl_->on_fork_.size();i++)
impl_->on_fork_[i]();
impl_->on_fork_.clear();
+
+ #ifndef CPPCMS_WIN32
+ //if(procs_no() > 1) {
+ reset_service();
+ //}
+ #endif
for(unsigned i=0;i<impl_->acceptors_.size();i++)
impl_->acceptors_[i]->async_accept();
setup_exit_handling();
+
+ std::cout << "LOOPING" << std::endl;
impl_->get_io_service().run();
+ std::cout << "Exitting " << std::endl;
}
int service::procs_no()
@@ -668,6 +702,12 @@
service::~service()
{
acceptors_.clear();
+ #ifndef CPPCMS_WIN32
+ if(process_shared_acceptor_.get()) {
+ process_shared_acceptor_->stop();
+ process_shared_acceptor_.reset();
+ }
+ #endif
thread_pool_.reset();
sig_.reset();
breaker_.reset();
Index: cgi_api.h
===================================================================
--- cgi_api.h (revision 1153)
+++ cgi_api.h (working copy)
@@ -24,6 +24,7 @@
#include "intrusive_ptr.h"
#include <vector>
#include <map>
+#include <memory>
#include "function.h"
#include "config.h"
#ifdef CPPCMS_USE_EXTERNAL_BOOST
@@ -58,6 +59,7 @@
public:
virtual void async_accept() = 0;
virtual void stop() = 0;
+ virtual std::auto_ptr<acceptor> clone(boost::asio::io_service &) = 0;
virtual ~acceptor(){}
};
Index: cgi_acceptor.h
===================================================================
--- cgi_acceptor.h (revision 1153)
+++ cgi_acceptor.h (working copy)
@@ -32,7 +32,13 @@
# include <cppcms_boost/bind.hpp>
namespace boost = cppcms_boost;
#endif
+#ifndef CPPCMS_WIN32
+#include "proc_acceptor.h"
+#include <unistd.h>
+#endif
+
+
namespace cppcms {
namespace impl {
namespace cgi {
@@ -40,12 +46,33 @@
template<typename Proto,class ServerAPI>
class socket_acceptor : public acceptor {
public:
+ socket_acceptor(cppcms::service &srv,boost::asio::io_service &iosrv) :
+ srv_(srv),
+ acceptor_(iosrv),
+ stopped_(false)
+ {
+ }
socket_acceptor(cppcms::service &srv) :
srv_(srv),
acceptor_(srv_.impl().get_io_service()),
stopped_(false)
{
}
+ virtual std::auto_ptr<acceptor> clone(boost::asio::io_service &iosrv)
+ {
+ #ifdef CPPCMS_WIN32
+ throw cppcms_error("Internal error - cloning should not be done under Windows platform");
+ #else
+ std::auto_ptr<socket_acceptor<Proto,ServerAPI> > tmp(new socket_acceptor<Proto,ServerAPI>(srv_,iosrv));
+ int newfd = ::dup(acceptor_.native());
+ if(newfd < 0)
+ throw cppcms_error(errno,"Dup failed");
+ tmp->acceptor_.assign(acceptor_.local_endpoint().protocol(),newfd);
+ std::auto_ptr<acceptor> tmp2;
+ tmp2 = tmp;
+ return tmp2;
+ #endif
+ }
virtual void async_accept()
{
if(stopped_)
@@ -53,11 +80,24 @@
ServerAPI *api=new ServerAPI(srv_);
api_=api;
asio_socket_ = &api->socket_;
- acceptor_.async_accept(
- *asio_socket_,
- boost::bind( &socket_acceptor::on_accept,
- this,
- boost::asio::placeholders::error));
+#ifndef CPPCMS_WIN32
+ if(srv_.impl().process_shared_acceptor_.get()) {
+ srv_.impl().process_shared_acceptor_->async_accept(
+ acceptor_,
+ *asio_socket_,
+ boost::bind( &socket_acceptor::on_accept,
+ this,
+ boost::asio::placeholders::error));
+ }
+ else
+#endif
+ {
+ acceptor_.async_accept(
+ *asio_socket_,
+ boost::bind( &socket_acceptor::on_accept,
+ this,
+ boost::asio::placeholders::error));
+ }
}
virtual void stop()
{
Index: service.h
===================================================================
--- service.h (revision 1153)
+++ service.h (working copy)
@@ -80,6 +80,7 @@
void start_acceptor();
void setup_exit_handling();
bool prefork();
+ void reset_service();
util::hold_ptr<impl::service> impl_;
};
Index: service_impl.h
===================================================================
--- service_impl.h (revision 1153)
+++ service_impl.h (working copy)
@@ -31,6 +31,7 @@
class session_pool;
namespace impl {
+ class process_shared_acceptor;
namespace cgi {
class acceptor;
}
@@ -44,6 +45,9 @@
return *io_service_;
}
+ #ifndef CPPCMS_WIN32
+ std::auto_ptr<process_shared_acceptor> process_shared_acceptor_;
+ #endif
private:
friend class cppcms::service;
Index: CMakeLists.txt
===================================================================
--- CMakeLists.txt (revision 1153)
+++ CMakeLists.txt (working copy)
@@ -520,7 +520,7 @@
if(WIN32 AND NOT CYGWIN)
set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_win32_file_storage.cpp)
else(WIN32 AND NOT CYGWIN)
- set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_posix_file_storage.cpp)
+ set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_posix_file_storage.cpp proc_acceptor.cpp)
endif(WIN32 AND NOT CYGWIN)
if(NOT DISABLE_SHARED)
Index: proc_acceptor.h
===================================================================
--- proc_acceptor.h (revision 1153)
+++ proc_acceptor.h (working copy)
@@ -29,7 +29,7 @@
~process_shared_acceptor();
template<typename Socket1,typename Socket2>
- void async_accept(Socket1 acc,Socket2 sock,on_accept_function const &on_accepted)
+ void async_accept(Socket1 &acc,Socket2 &sock,on_accept_function const &on_accepted)
{
int fd = acc.native();
{
@@ -38,16 +38,28 @@
if(fd + 1 > max_1_ )
max_1_ = fd+1;
callbacks_[fd]=on_accepted;
- assigners_[fd]=boost::bind(&Socket2::assign,&sock,_1);
+ assigner<Socket2> as = { &sock, acc.local_endpoint().protocol() };
+ assigners_[fd]=as;
}
wake();
}
- void stop()
+ void start();
+ void stop();
+ private:
+
+ template<typename Socket>
+ struct assigner
{
- stop_ = true;
- wake();
- }
- private:
+ typedef typename Socket::protocol_type protocol_type;
+ Socket *sock;
+ protocol_type proto;
+ void operator()(int fd) const
+ {
+ sock->assign(proto,fd);
+ }
+ };
+
+
void on_fork();
void wake();
bool run_one();
@@ -64,6 +76,7 @@
std::vector<on_accept_function> callbacks_;
std::vector<assign_function> assigners_;
boost::asio::io_service *service_;
+ std::auto_ptr<boost::thread> worker_;
}
;
}

+ 3
- 0
service.cpp View File

@@ -344,6 +344,9 @@ int service::procs_no()
#ifdef CPPCMS_WIN32
if(procs > 0)
throw cppcms_error("Prefork is not supported under Windows");
#else
if(procs > 1)
throw cppcms_error("Prefork support is disabled meanwhile in CppCMS. To Be Fixed");
#endif
return procs;
}


Loading…
Cancel
Save