|
- Index: proc_acceptor.cpp
- ===================================================================
- --- proc_acceptor.cpp (revision 1153)
- +++ proc_acceptor.cpp (working copy)
- @@ -1,5 +1,6 @@
- #define CPPCMS_SOURCE
- #include "proc_acceptor.h"
- +#include <iostream>
- namespace cppcms { namespace impl {
-
- process_shared_acceptor::process_shared_acceptor(cppcms::service &srv)
- @@ -12,15 +13,30 @@
- service_=&srv.impl().get_io_service();
- srv.after_fork(boost::bind(&process_shared_acceptor::on_fork,this));
- }
- +
- + void process_shared_acceptor::start()
- + {
- + worker_.reset(new boost::thread(boost::bind(&process_shared_acceptor::run,this)));
- + }
- +
- + void process_shared_acceptor::stop()
- + {
- + stop_ = true;
- + wake();
- + worker_->join();
- + }
-
- void process_shared_acceptor::on_fork()
- {
- int fds[2];
- - pipe(fds);
- + if(::pipe(fds) < 0) {
- + throw cppcms_error(errno,"Creation of pipe failed");
- + }
- break_fd_=fds[0];
- wake_fd_=fds[1];
- max_1_=std::max(break_fd_,wake_fd_)+1;
- FD_SET(break_fd_,&wait_set);
- + start();
- }
- void process_shared_acceptor::wake()
- {
- @@ -32,6 +48,26 @@
- }
- }
-
- + struct assigner_and_caller {
- + function<void(int)> assigner;
- + function<void(boost::system::error_code const &e)> callback;
- + int fd;
- + boost::system::error_code e;
- + void operator()()
- + {
- + std::cout << "Assigning and calling" << std::endl;
- + try {
- + if(!assigner.empty())
- + assigner(fd);
- + callback(e);
- + }
- + catch(std::exception const &e)
- + {
- + std::cerr << "Assigning and calling failed " << e.what() << std::endl;
- + }
- + }
- + };
- +
- bool process_shared_acceptor::run_one()
- {
- fd_set rd;
- @@ -42,7 +78,9 @@
- int n;
- {
- mutex::guard guard(process_mutex_);
- + std::cout << "Selecting " << getpid() << std::endl;
- n=::select(max_1_,&rd,0,0,0);
- + std::cout << "Selected " << n << std::endl;
- if(n < 0 && errno==EINTR)
- return true;
- if(n < 0)
- @@ -63,19 +101,22 @@
- int fd=::accept(afd,0,0);
- int err=errno;
-
- + std::cout << "Accepted " << afd << " as " << fd << " in " << getpid() <<std::endl;
- +
- thread_guard tguard(thread_mutex_);
-
- - boost::system::error_code e;
- - if(fd > 0) {
- - assigners_[afd](fd);
- - assigners_[afd]=assign_function();
- + assigner_and_caller anc;
- + if(fd >= 0) {
- + anc.assigner.swap(assigners_[afd]);
- + anc.fd=fd;
- }
- else {
- - e=boost::system::error_code(err,boost::system::errno_ecat);
- + anc.e=boost::system::error_code(err,boost::system::errno_ecat);
- }
- - service_->post(boost::bind(callbacks_[afd],e));
- - callbacks_[afd]=on_accept_function();
- + anc.callback.swap(callbacks_[afd]);
- FD_CLR(afd,&wait_set);
- + std::cout << "Posting " << std::endl;
- + service_->post(anc);
- }
- }
- }
- @@ -83,8 +124,10 @@
- }
- void process_shared_acceptor::run()
- {
- + std::cout << "Running " << getpid() << std::endl;
- while(run_one())
- ;
- + std::cout << "Done " << getpid() << std::endl;
- }
-
- process_shared_acceptor::~process_shared_acceptor()
- Index: service.cpp
- ===================================================================
- --- service.cpp (revision 1153)
- +++ service.cpp (working copy)
- @@ -49,7 +49,11 @@
- #include "views_pool.h"
- #include "session_pool.h"
-
- +#ifndef CPPCMS_WIN32
- +#include "proc_acceptor.h"
- +#endif
-
- +
- #ifdef CPPCMS_POSIX
- #include <sys/wait.h>
- #endif
- @@ -304,6 +308,17 @@
- #endif
- }
-
- +void service::reset_service()
- +{
- + #ifndef CPPCMS_WIN32
- + std::auto_ptr<boost::asio::io_service> new_service(new boost::asio::io_service());
- + for(unsigned i=0;i<impl_->acceptors_.size();i++)
- + impl_->acceptors_[i]=impl_->acceptors_[i]->clone(*new_service);
- + impl_->io_service_ = new_service;
- +
- + #endif
- +}
- +
- void service::after_fork(function<void()> const &cb)
- {
- impl_->on_fork_.push_back(cb);
- @@ -313,27 +328,46 @@
- {
- generator();
- session_pool().init();
- +
- + #ifndef CPPCMS_WIN32
- + //if(procs_no() > 1) {
- + impl_->process_shared_acceptor_.reset(new impl::process_shared_acceptor(*this));
- + //}
- + #endif
- +
- start_acceptor();
-
- if(settings().get("file_server.enable",false))
- applications_pool().mount(applications_factory<cppcms::impl::file_server>(),"");
-
- + std::cout << "FORKING" << std::endl;
- +
- if(prefork()) {
- return;
- }
- thread_pool(); // make sure we start it
-
- +
- for(unsigned i=0;i<impl_->on_fork_.size();i++)
- impl_->on_fork_[i]();
-
- impl_->on_fork_.clear();
- +
- + #ifndef CPPCMS_WIN32
- + //if(procs_no() > 1) {
- + reset_service();
- + //}
- + #endif
-
- for(unsigned i=0;i<impl_->acceptors_.size();i++)
- impl_->acceptors_[i]->async_accept();
-
- setup_exit_handling();
- +
- + std::cout << "LOOPING" << std::endl;
-
- impl_->get_io_service().run();
- + std::cout << "Exitting " << std::endl;
- }
-
- int service::procs_no()
- @@ -668,6 +702,12 @@
- service::~service()
- {
- acceptors_.clear();
- + #ifndef CPPCMS_WIN32
- + if(process_shared_acceptor_.get()) {
- + process_shared_acceptor_->stop();
- + process_shared_acceptor_.reset();
- + }
- + #endif
- thread_pool_.reset();
- sig_.reset();
- breaker_.reset();
- Index: cgi_api.h
- ===================================================================
- --- cgi_api.h (revision 1153)
- +++ cgi_api.h (working copy)
- @@ -24,6 +24,7 @@
- #include "intrusive_ptr.h"
- #include <vector>
- #include <map>
- +#include <memory>
- #include "function.h"
- #include "config.h"
- #ifdef CPPCMS_USE_EXTERNAL_BOOST
- @@ -58,6 +59,7 @@
- public:
- virtual void async_accept() = 0;
- virtual void stop() = 0;
- + virtual std::auto_ptr<acceptor> clone(boost::asio::io_service &) = 0;
- virtual ~acceptor(){}
- };
-
- Index: cgi_acceptor.h
- ===================================================================
- --- cgi_acceptor.h (revision 1153)
- +++ cgi_acceptor.h (working copy)
- @@ -32,7 +32,13 @@
- # include <cppcms_boost/bind.hpp>
- namespace boost = cppcms_boost;
- #endif
- +#ifndef CPPCMS_WIN32
- +#include "proc_acceptor.h"
- +#include <unistd.h>
- +#endif
-
- +
- +
- namespace cppcms {
- namespace impl {
- namespace cgi {
- @@ -40,12 +46,33 @@
- template<typename Proto,class ServerAPI>
- class socket_acceptor : public acceptor {
- public:
- + socket_acceptor(cppcms::service &srv,boost::asio::io_service &iosrv) :
- + srv_(srv),
- + acceptor_(iosrv),
- + stopped_(false)
- + {
- + }
- socket_acceptor(cppcms::service &srv) :
- srv_(srv),
- acceptor_(srv_.impl().get_io_service()),
- stopped_(false)
- {
- }
- + virtual std::auto_ptr<acceptor> clone(boost::asio::io_service &iosrv)
- + {
- + #ifdef CPPCMS_WIN32
- + throw cppcms_error("Internal error - cloning should not be done under Windows platform");
- + #else
- + std::auto_ptr<socket_acceptor<Proto,ServerAPI> > tmp(new socket_acceptor<Proto,ServerAPI>(srv_,iosrv));
- + int newfd = ::dup(acceptor_.native());
- + if(newfd < 0)
- + throw cppcms_error(errno,"Dup failed");
- + tmp->acceptor_.assign(acceptor_.local_endpoint().protocol(),newfd);
- + std::auto_ptr<acceptor> tmp2;
- + tmp2 = tmp;
- + return tmp2;
- + #endif
- + }
- virtual void async_accept()
- {
- if(stopped_)
- @@ -53,11 +80,24 @@
- ServerAPI *api=new ServerAPI(srv_);
- api_=api;
- asio_socket_ = &api->socket_;
- - acceptor_.async_accept(
- - *asio_socket_,
- - boost::bind( &socket_acceptor::on_accept,
- - this,
- - boost::asio::placeholders::error));
- +#ifndef CPPCMS_WIN32
- + if(srv_.impl().process_shared_acceptor_.get()) {
- + srv_.impl().process_shared_acceptor_->async_accept(
- + acceptor_,
- + *asio_socket_,
- + boost::bind( &socket_acceptor::on_accept,
- + this,
- + boost::asio::placeholders::error));
- + }
- + else
- +#endif
- + {
- + acceptor_.async_accept(
- + *asio_socket_,
- + boost::bind( &socket_acceptor::on_accept,
- + this,
- + boost::asio::placeholders::error));
- + }
- }
- virtual void stop()
- {
- Index: service.h
- ===================================================================
- --- service.h (revision 1153)
- +++ service.h (working copy)
- @@ -80,6 +80,7 @@
- void start_acceptor();
- void setup_exit_handling();
- bool prefork();
- + void reset_service();
- util::hold_ptr<impl::service> impl_;
- };
-
- Index: service_impl.h
- ===================================================================
- --- service_impl.h (revision 1153)
- +++ service_impl.h (working copy)
- @@ -31,6 +31,7 @@
- class session_pool;
-
- namespace impl {
- + class process_shared_acceptor;
- namespace cgi {
- class acceptor;
- }
- @@ -44,6 +45,9 @@
- return *io_service_;
- }
-
- + #ifndef CPPCMS_WIN32
- + std::auto_ptr<process_shared_acceptor> process_shared_acceptor_;
- + #endif
-
- private:
- friend class cppcms::service;
- Index: CMakeLists.txt
- ===================================================================
- --- CMakeLists.txt (revision 1153)
- +++ CMakeLists.txt (working copy)
- @@ -520,7 +520,7 @@
- if(WIN32 AND NOT CYGWIN)
- set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_win32_file_storage.cpp)
- else(WIN32 AND NOT CYGWIN)
- - set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_posix_file_storage.cpp)
- + set(CPPCMS_SOURCES ${CPPCMS_SOURCES} session_posix_file_storage.cpp proc_acceptor.cpp)
- endif(WIN32 AND NOT CYGWIN)
-
- if(NOT DISABLE_SHARED)
- Index: proc_acceptor.h
- ===================================================================
- --- proc_acceptor.h (revision 1153)
- +++ proc_acceptor.h (working copy)
- @@ -29,7 +29,7 @@
- ~process_shared_acceptor();
-
- template<typename Socket1,typename Socket2>
- - void async_accept(Socket1 acc,Socket2 sock,on_accept_function const &on_accepted)
- + void async_accept(Socket1 &acc,Socket2 &sock,on_accept_function const &on_accepted)
- {
- int fd = acc.native();
- {
- @@ -38,16 +38,28 @@
- if(fd + 1 > max_1_ )
- max_1_ = fd+1;
- callbacks_[fd]=on_accepted;
- - assigners_[fd]=boost::bind(&Socket2::assign,&sock,_1);
- + assigner<Socket2> as = { &sock, acc.local_endpoint().protocol() };
- + assigners_[fd]=as;
- }
- wake();
- }
- - void stop()
- + void start();
- + void stop();
- + private:
- +
- + template<typename Socket>
- + struct assigner
- {
- - stop_ = true;
- - wake();
- - }
- - private:
- + typedef typename Socket::protocol_type protocol_type;
- + Socket *sock;
- + protocol_type proto;
- + void operator()(int fd) const
- + {
- + sock->assign(proto,fd);
- + }
- + };
- +
- +
- void on_fork();
- void wake();
- bool run_one();
- @@ -64,6 +76,7 @@
- std::vector<on_accept_function> callbacks_;
- std::vector<assign_function> assigners_;
- boost::asio::io_service *service_;
- + std::auto_ptr<boost::thread> worker_;
- }
- ;
- }
|