Browse Source

Added an interface for server side events streaming with long polling support

master
Artyom Beilis 11 years ago
parent
commit
20eadf8e25
8 changed files with 1079 additions and 0 deletions
  1. +65
    -0
      contrib/server_side/sse/examples/chat.cpp
  2. +18
    -0
      contrib/server_side/sse/examples/config-chat.js
  3. +18
    -0
      contrib/server_side/sse/examples/config-ticker.js
  4. +42
    -0
      contrib/server_side/sse/examples/the_chat.html
  5. +30
    -0
      contrib/server_side/sse/examples/the_ticker.html
  6. +76
    -0
      contrib/server_side/sse/examples/ticker.cpp
  7. +391
    -0
      contrib/server_side/sse/server_sent_events.cpp
  8. +439
    -0
      contrib/server_side/sse/server_sent_events.h

+ 65
- 0
contrib/server_side/sse/examples/chat.cpp View File

@@ -0,0 +1,65 @@
//
// Copyright (C) 2008-2012 Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
// See accompanying file COPYING.TXT file for licensing details.
//
#include <cppcms/application.h>
#include <cppcms/url_dispatcher.h>
#include <cppcms/applications_pool.h>
#include <cppcms/service.h>
#include <cppcms/http_response.h>
#include <cppcms/http_request.h>
#include <cppcms/http_context.h>
#include <booster/intrusive_ptr.h>
#include "server_sent_events.h"

#include <set>

class chat : public cppcms::application {
public:
chat(cppcms::service &srv) :
cppcms::application(srv)
{
queue_ = sse::bounded_event_queue::create(srv.get_io_service(),1024);
queue_->enable_keep_alive(10);
dispatcher().assign("/post",&chat::post,this);
dispatcher().assign("/get",&chat::get,this);
dispatcher().assign(".*",&chat::redirect,this);
}
void redirect()
{
response().set_redirect_header("/the_chat.html");
}
void post()
{
if(request().request_method()=="POST") {
std::string message = request().post("message");
queue_->enqueue(message);
}
}
void get()
{
queue_->accept(release_context());
}
private:
booster::shared_ptr<sse::bounded_event_queue> queue_;
};


int main(int argc,char **argv)
{
try {
cppcms::service service(argc,argv);
booster::intrusive_ptr<chat> c=new chat(service);
service.applications_pool().mount(c);
service.run();
}
catch(std::exception const &e) {
std::cerr<<"Catched exception: "<<e.what()<<std::endl;
return 1;
}
return 0;
}

// vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4


+ 18
- 0
contrib/server_side/sse/examples/config-chat.js View File

@@ -0,0 +1,18 @@
{
"service" : {
"api" : "http",
"port" : 8080
},
"http" : {
"script" : "/chat"
},
"logging" : {
"level" : "debug"
},
"file_server" : {
"enable" : true,
"document_root" : "."
},

}


+ 18
- 0
contrib/server_side/sse/examples/config-ticker.js View File

@@ -0,0 +1,18 @@
{
"service" : {
"api" : "http",
"port" : 8080
},
"http" : {
"script" : "/ticker"
},
"logging" : {
"level" : "debug"
},
"file_server" : {
"enable" : true,
"document_root" : "."
},

}


+ 42
- 0
contrib/server_side/sse/examples/the_chat.html View File

@@ -0,0 +1,42 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script>
<title>Chat Room</title>
</head>
<body>
<h1>Chat room</h1>
<script type="text/javascript">
var message_count = 0;
function send_data() {
$.post("/chat/post", {'message': $('#message').val()});
$("#message").val("");
return false;
}
function read_data() {
var stream = new EventSource('/chat/get');

stream.onmessage = function(e){
var text = e.data.replace(/\n/g,'\n<br/>');
var text = '<p>' + text + '</p>'
$('#messages').html($('#messages').html() + text);
};
stream.onerror = function(e){
console.log(e);
};
}

read_data();
</script>
<form id="theform" >
<textarea id="message" name="message"></textarea>
<input type="submit" value="Send" onclick="return send_data()"/>
</form>
<div id="messages">
</div>
</body>


+ 30
- 0
contrib/server_side/sse/examples/the_ticker.html View File

@@ -0,0 +1,30 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
<!-- <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script> -->
<title>Chat Room</title>
</head>
<body>
<h1>Stock Price</h1>
<script type="text/javascript">
function read_data() {
var stream = new EventSource('/ticker');
stream.onmessage = function(e){
console.log(e);
console.log(e.type);
document.getElementById('price').innerHTML=e.data;
};

stream.onerror = function(e){
console.log(e);
};
}

read_data();
</script>
<p>Price:<span id="price"></span></p>
</body>


+ 76
- 0
contrib/server_side/sse/examples/ticker.cpp View File

@@ -0,0 +1,76 @@
//
// Copyright (C) 2008-2012 Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
// See accompanying file COPYING.TXT file for licensing details.
//
#include "server_sent_events.h"

#include <cppcms/application.h>
#include <cppcms/applications_pool.h>
#include <cppcms/service.h>
#include <cppcms/http_context.h>
#include <booster/aio/deadline_timer.h>
#include <booster/system_error.h>

#include <sstream>

class ticker : public cppcms::application {
public:
ticker(cppcms::service &srv) :
cppcms::application(srv),
tm_(srv.get_io_service()),
price_(1.0)
{
stream_ = sse::state_stream::create(srv.get_io_service());
wait();
}

void wait()
{
tm_.expires_from_now(booster::ptime::from_number(double(rand())/RAND_MAX + 0.01));
tm_.async_wait([=](booster::system::error_code const &e){
if(!e) {
on_timer();
wait();
}
});
}

void on_timer()
{
price_ += double(rand()) / RAND_MAX * 2.0 - 1;
if(price_ <= 0.01)
price_ = 0.01;
std::ostringstream ss;
ss << price_;
stream_->update(ss.str());
}
void main(std::string /*url*/)
{
stream_->accept(release_context());
}
private:
booster::shared_ptr<sse::state_stream> stream_;
booster::aio::deadline_timer tm_;
double price_;
};


int main(int argc,char **argv)
{
try {
cppcms::service service(argc,argv);
booster::intrusive_ptr<ticker> c=new ticker(service);
service.applications_pool().mount(c);
service.run();
}
catch(std::exception const &e) {
std::cerr<<"Catched exception: "<<e.what()<<std::endl;
return 1;
}
return 0;
}
// vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4



+ 391
- 0
contrib/server_side/sse/server_sent_events.cpp View File

@@ -0,0 +1,391 @@
#include "server_sent_events.h"

#include <cppcms/http_response.h>
#include <cppcms/http_request.h>
#include <booster/system_error.h>
#include <stdio.h>
#include <string.h>

namespace sse {

void write_event(std::ostream &out,
char const *id,
char const *data,
char const *event)
{
out << "id:" << id <<"\n";
if(event && *event)
out<< "event:" << event <<"\n";
out << "data:";
for(char const *p=data;*p;) {
char c=*p;
if(c=='\r') {
p++;
continue;
}
else if(c=='\n') {
p++;
out << "\ndata:";
}
else {
char const *e=p;
while((c=*e)!=0 && c!='\r' && c!='\n')
e++;
out.write(p,e-p);
p=e;
}
}
out << "\n\n";
}

event_stream::event_stream()
{
}

event_stream::event_stream(booster::shared_ptr<cppcms::http::context> ctx) : ctx_(ctx)
{
}
event_stream::event_stream(event_stream const &other) : ctx_(other.ctx_),last_id_(other.last_id_)
{
}

event_stream &event_stream::operator=(event_stream const &other)
{
ctx_ = other.ctx_;
last_id_ = other.last_id_;
return *this;
}


event_stream::~event_stream()
{
}

char const *event_stream::last_id()
{
return last_id_.c_str();
}


size_t event_stream::last_integer_id()
{
return atoi(last_id());
}

void event_stream::last_id(char const *id)
{
last_id_ = id;
}

void event_stream::last_integer_id(size_t id)
{
char buf[128];
snprintf(buf,sizeof(buf),"%lld",static_cast<unsigned long long>(id));
last_id(buf);
}


void event_stream::write(char const *data,char const *id,char const *event)
{
write_event(ctx_->response().out(),id,data,event);
last_id(id);
}

void event_stream::write(char const *data,size_t n,char const *event)
{
char buf[32];
snprintf(buf,sizeof(buf),"%ld",static_cast<unsigned long>(n));
write(data,buf,event);
}

void event_stream::write(std::string const &data,std::string const &id,std::string const &event)
{
if(event.empty())
write(data.c_str(),id.c_str());
else
write(data.c_str(),id.c_str(),event.c_str());
}
void event_stream::write(std::string const &data,size_t id,std::string const &event)
{
if(event.empty())
write(data.c_str(),id);
else
write(data.c_str(),id,event.c_str());
}

booster::shared_ptr<cppcms::http::context> event_stream::context()
{
return ctx_;
}



event_source::event_source(booster::aio::io_service &srv) :
polling_env_var_("HTTP_X_SSE_SIMULATION"),
polling_value_("long-polling"),
closing_(false),
timer_(srv)
{
last_broadcast_ = booster::ptime::now();
}

event_source::~event_source()
{
}

namespace details {

typedef booster::intrusive_ptr<booster::callable<void(cppcms::http::context::completion_type status)> > comp_ptr;
typedef booster::intrusive_ptr<booster::callable<void()> > disco_ptr;
typedef booster::intrusive_ptr<booster::callable<void(booster::system::error_code const &)> > ka_ptr;

class post_send : public booster::callable<void(cppcms::http::context::completion_type status)> {
public:
post_send(event_stream const &es,booster::weak_ptr<event_source> q) :
stream_(es),
queue_(q)
{
}
event_stream &stream()
{
return stream_;
}
void operator()(cppcms::http::context::completion_type status)
{
if(status!=0)
return;
booster::shared_ptr<event_source> queue = queue_.lock();
if(!queue)
return;
if(queue->closing_) {
stream_.context()->async_complete_response();
return;
}
if(queue->on_sent(stream_))
stream_.context()->async_flush_output(comp_ptr(this));
else {
booster::intrusive_ptr<post_send> self(this);
queue->streamers_.insert(self);
}
}
private:
event_stream stream_;
booster::weak_ptr<event_source> queue_;
};

class remove_poller : public booster::callable<void()> {
public:
remove_poller(booster::shared_ptr<cppcms::http::context> ctx,booster::weak_ptr<event_source> q) :
w_ctx_(ctx),
queue_(q)
{
}
virtual void operator()()
{
booster::shared_ptr<event_source> self = queue_.lock();
if(!self)
return;
booster::shared_ptr<cppcms::http::context> ctx = w_ctx_.lock();
if(ctx) {
event_stream es(ctx);
self->long_pollers_.erase(es);
}
}
private:
booster::weak_ptr<cppcms::http::context> w_ctx_;
booster::weak_ptr<event_source> queue_;
};

class remove_streamer : public booster::callable<void()> {
public:
remove_streamer(booster::intrusive_ptr<details::post_send> ev,booster::weak_ptr<event_source> q) :
ev_(ev),
queue_(q)
{
}
virtual void operator()()
{
booster::shared_ptr<event_source> self = queue_.lock();
if(self) {
self->streamers_.erase(ev_);
}
}
private:
booster::intrusive_ptr<details::post_send> ev_;
booster::weak_ptr<event_source> queue_;
};

class keep_alive_updater : public booster::callable<void(booster::system::error_code const &e)> {
public:
keep_alive_updater(booster::shared_ptr<event_source> s) : queue_(s)
{
}
virtual void operator()(booster::system::error_code const &e)
{
if(e)
return;
booster::ptime diff = booster::ptime::now() - queue_->last_broadcast_;
if(diff >= queue_->idle_limit_) {
queue_->keep_alive();
queue_->timer_.expires_from_now(queue_->idle_limit_);
}
else {
queue_->timer_.expires_from_now(queue_->idle_limit_ - diff);
}
queue_->timer_.async_wait(ka_ptr(this));
}
private:
booster::shared_ptr<event_source> queue_;
};

}

void event_source::enable_keep_alive(double idle)
{
if(idle <= 0.001)
throw booster::runtime_error("sse:enable_keep_alive the idle parameter is too small");
booster::ptime limit = booster::ptime::from_number(idle);
if(idle_limit_ == booster::ptime::zero) {
idle_limit_ = limit;
details::ka_ptr ptr(new details::keep_alive_updater(shared_from_this()));
(*ptr)(booster::system::error_code()); // start it
}
else {
idle_limit_ = limit; // just update
}
}

void event_source::accept(booster::shared_ptr<cppcms::http::context> ctx)
{
if(closing_)
throw booster::logic_error("sse:accept is called after close operation started");
event_stream es(ctx);
ctx->response().content_type("text/event-stream"); // no need to specify UTF-8
ctx->response().cache_control("no-cache");

bool got_something = false;
char const *env = ctx->request().cgetenv("HTTP_LAST_EVENT_ID");
if(*env == 0) {
got_something = on_connect(es);
}
else {
es.last_id(env);
got_something = on_reconnect(es);
}
bool polling = check_polling(ctx);
if(polling) {
if(got_something)
ctx->async_complete_response();
else {
ctx->async_on_peer_reset(details::disco_ptr(new details::remove_poller(ctx,shared_from_this())));
long_pollers_.insert(es);
}
}
else {
booster::intrusive_ptr<details::post_send> ps = new details::post_send(es,shared_from_this());
ctx->async_on_peer_reset(details::disco_ptr(new details::remove_streamer(ps,shared_from_this())));
if(got_something) {
ctx->async_flush_output(ps);
}
else {
streamers_.insert(ps);
}
}
}

bool event_source::check_polling(booster::shared_ptr<cppcms::http::context> ctx)
{
char const *val = ctx->request().cgetenv(polling_env_var_.c_str());
if(val[0]==0) // undefined
return false;
if(polling_value_.empty()) // accept any value
return true;
return val == polling_value_;
}

void event_source::close()
{
closing_ = true;
for(streamers_type::iterator p=streamers_.begin();p!=streamers_.end();++p) {
booster::intrusive_ptr<details::post_send> ptr = *p;
ptr->stream().context()->async_complete_response();
}
streamers_.clear();

for(long_pollers_type::iterator p=long_pollers_.begin();p!=long_pollers_.end();++p) {
event_stream es = *p;
es.context()->async_complete_response();
}
long_pollers_.clear();

if(idle_limit_!=booster::ptime::zero)
timer_.cancel();
}

size_t event_source::waiting()
{
return long_pollers_.size() + streamers_.size();
}

void event_source::keep_alive(char const *comment)
{
if(closing_) {
return;
}
for(streamers_type::iterator it=streamers_.begin();it!=streamers_.end();) {
booster::intrusive_ptr<details::post_send> ps = *it;
streamers_type::iterator tmp = it++;
streamers_.erase(tmp);

ps->stream().context()->response().out() << ':' << comment << "\n\n";
ps->stream().context()->async_flush_output(ps);
}

for(long_pollers_type::iterator it=long_pollers_.begin();it!=long_pollers_.end();) {
event_stream stream = *it;
long_pollers_type::iterator tmp = it++;
long_pollers_.erase(tmp);
stream.context()->response().out() << ':' << comment << "\n\n";
stream.context()->async_complete_response();
}
last_broadcast_ = booster::ptime::now();
}


void event_source::broadcast()
{
if(closing_) {
throw booster::logic_error("sse:can't call broadcast after close is called");
}
for(streamers_type::iterator it=streamers_.begin();it!=streamers_.end();) {
booster::intrusive_ptr<details::post_send> ps = *it;
if(!send(ps->stream())) {
++it;
}
else {
streamers_type::iterator tmp = it++;
streamers_.erase(tmp);
ps->stream().context()->async_flush_output(ps);
}
}

for(long_pollers_type::iterator it=long_pollers_.begin();it!=long_pollers_.end();) {
event_stream stream = *it;
if(!send(stream)) {
++it;
}
else {
long_pollers_type::iterator tmp = it++;
long_pollers_.erase(tmp);
stream.context()->async_complete_response();
}
}
if(long_pollers_.empty() && streamers_.empty())
last_broadcast_ = booster::ptime::now();
}

} // sse

+ 439
- 0
contrib/server_side/sse/server_sent_events.h View File

@@ -0,0 +1,439 @@
//
// Copyright (C) 2009-2012 Artyom Beilis (Tonkikh)
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef SERVER_SENT_EVENTS_H
#define SERVER_SENT_EVENTS_H

#include <booster/noncopyable.h>
#include <cppcms/http_context.h>
#include <booster/shared_ptr.h>
#include <booster/enable_shared_from_this.h>
#include <booster/intrusive_ptr.h>
#include <booster/aio/deadline_timer.h>
#include <booster/posix_time.h>
#include <set>
#include <map>
#include <vector>
#include <ostream>

namespace sse {

///
/// Basic function that allows to format a proper output for event id, data and event name
/// according to Server-Sent Events protocol
///
void write_event(std::ostream &output,
char const *id,
char const *data,
char const *event=0);

///
/// This class represent an output event stream
///
class event_stream {
public:
explicit event_stream(booster::shared_ptr<cppcms::http::context> ctx);

event_stream();
event_stream(event_stream const &);
event_stream &operator=(event_stream const &);
~event_stream();

bool operator<(event_stream const &other) const
{
return ctx_ < other.ctx_;
}

bool operator==(event_stream const &other) const
{
return ctx_ == other.ctx_;
}
bool operator!=(event_stream const &other) const
{
return ctx_ != other.ctx_;
}

///
/// Get the last event id that was known to client, if not known (new connection)
/// return false
///
char const *last_id();

///
/// Get last event id as integer, if not known returns 0
///
size_t last_integer_id();

///
/// Set last event id
///
void last_id(char const *id);
///
/// Set last event id as number
///
void last_integer_id(size_t id);
///
/// Write event to stream, note last_id() is updated automatically
///
void write(char const *data,char const *id,char const *event=0);
///
/// Write event to stream, note last_id() is updated automatically
///
void write(char const *data,size_t id,char const *event=0);

///
/// Write event to stream, note last_id() is updated automatically
///
void write(std::string const &data,std::string const &id,std::string const &event=std::string());
///
/// Write event to stream, note last_id() is updated automatically
///
void write(std::string const &data,size_t id,std::string const &event=std::string());

///
/// Get the context associated with the object
///
booster::shared_ptr<cppcms::http::context> context();

private:
booster::shared_ptr<cppcms::http::context> ctx_;
std::string last_id_;
};

namespace details {
class post_send;
class remove_poller;
class remove_streamer;
class keep_alive_updater;
};

///
/// This class represents an basic event source object that allows you handle multiple
/// connections simultaneously
///

class event_source : public booster::enable_shared_from_this<event_source> {
protected:
///
/// Create a new queue
///
/// Note you must always create it using booster::shared_ptr<new DrivedClassFromEventSource>
///
event_source(booster::aio::io_service &srv);
public:

virtual ~event_source();

///
/// Set a header that would be used to detect that client uses XHR long polling rather
/// than EventSource. The first parameter is CGI variable and the second the expected
/// value, if the value is empty, any non-empty data coming with this header would be
/// considered a long polling request.
///
/// For example for a header "X-Long-Polling: true" call polling_detector("HTTP_X_LONG_POLLING","true");
///
/// The default is: "HTTP_X_SSE_SIMULATION","long-polling" that requires HTTP Header "X-SSE-Simulation:long-polling"
///
void polling_detector(std::string const &cgi_variable,std::string const &value);

///
/// Accept new connection
///
virtual void accept(booster::shared_ptr<cppcms::http::context> ctx);

///
/// Broadcast event to all connections waiting for event
///
/// this->send() would be called for all pending connections
///
virtual void broadcast();

///
/// Close all outstanding connections
///
/// If accept or broadcast is called after async_close is called, it would throw booster::logic_error error
///
virtual void close() ;

///
/// Get a number of connections waiting for new event to be broadcasted
///
size_t waiting();
///
/// Send keep alive notification to clients - basically send a single line comment
/// to make sure that clients that do not work would be disconnected
///
virtual void keep_alive(char const *comment = "keep alive");
///
/// Send keep_alive notification to clients every \a idle seconds if no notifications were
/// sent during this period. Disabled by default.
///
void enable_keep_alive(double idle = 30.0);

protected:
///
/// This callback is called upon new connection without header Last-Event-ID
///
/// Return true if some data was sent to the stream
///
virtual bool on_connect(event_stream &ev)
{
return on_sent(ev);
}
///
/// This callback is called upon new connection with non-empty header Last-Event-ID
///
/// Return true if some data was sent to the stream
///
virtual bool on_reconnect(event_stream &ev)
{
return on_sent(ev);
}
///
/// Send new events that got to stream. This function is called upon broadcast() call
///
/// Return true if new data was written, otherwise return false.
///
virtual bool send(event_stream &ev)
{
return on_sent(ev);
}
///
/// After sending events to client is complete this function is called. You are expected
/// to check if there some new events occurred using last_id() or last_integer_id() values
/// and send them to client returning true, otherwise return false
///
/// Return true if some data was sent to the stream
///
///
/// This is a minimal function that should be implemented for stream handing
///
virtual bool on_sent(event_stream &) = 0;


///
/// By default checks the HTTP header, you can override it to use your own method
///
virtual bool check_polling(booster::shared_ptr<cppcms::http::context> ctx);

private:
friend class details::post_send;
friend class details::remove_streamer;
friend class details::remove_poller;
friend class details::keep_alive_updater;


typedef std::set<booster::intrusive_ptr<details::post_send> > streamers_type;
typedef std::set<event_stream> long_pollers_type;

streamers_type streamers_;
long_pollers_type long_pollers_;

std::string polling_env_var_;
std::string polling_value_;
bool closing_;
booster::aio::deadline_timer timer_;
booster::ptime last_broadcast_;
booster::ptime idle_limit_;
};



///
/// State Stream a class that keeps the client updated about the latest events.
///
/// Consider stock prices. User needs to know the latest value, so if he connects
/// later or misses some of the values he receives all events or events he does
/// not read yet.
///
class state_stream : public event_source {
protected:
state_stream(booster::aio::io_service &srv) :
event_source(srv)
{
}
public:
static booster::shared_ptr<state_stream> create(booster::aio::io_service &srv)
{
booster::shared_ptr<state_stream> p(new state_stream(srv));
return p;
}

///
/// Update an event - default message. If \a send is false the messages are not dispatched
/// immediately, you can dispatch them later by calling broadcast() or by updating an
/// event with send=true
///
void update(std::string const &data,bool send=true)
{
update(std::string(),data,send);
}
///
/// Update a named event. If \a send is false the messages are not dispatched
/// immediately, you can dispatch them later by calling broadcast() or by updating an
/// event with send=true
///
void update(std::string const &event,std::string const &data,bool send=true)
{
current_++;
state &st = states_[event];
st.updated = current_;
st.data = data;
if(send)
broadcast();
}
protected:

bool on_sent(event_stream &es)
{
bool has_something = false;
size_t last_id = es.last_integer_id();
for(states_type::iterator p=states_.begin();p!=states_.end();++p) {
if(p->second.updated > last_id) {
if(p->first.empty())
es.write(p->second.data,current_);
else
es.write(p->second.data,current_,p->first);
has_something = true;
}
}
return has_something;
}

private:
struct state {
size_t updated;
std::string data;
};
size_t current_;
typedef std::map<std::string,state> states_type;
states_type states_;
};


///
/// This is an event queue object. It keeps limited number of messages, such that if the
/// user hadn't received messages for a long time he would be able to receive only
/// the latest ones.
///
class bounded_event_queue : public event_source {
protected:
bounded_event_queue(booster::aio::io_service &srv,size_t size,size_t limit = 256) :
event_source(srv),
start_(0),
end_(0),
capacity_(size),
limit_(limit)
{
messages_.reserve(size);
}
public:
///
/// Create a queue of maximal size \a size, such that user that connects too late
/// it would be able to receive at most \a size latest messages
///
/// It is also possible to set the maximal number of messages send in one operation
/// uning \a limit paeameter. See: \ref message_send_limit
///
static booster::shared_ptr<bounded_event_queue> create( booster::aio::io_service &srv,
size_t size,
size_t limit=256)
{
booster::shared_ptr<bounded_event_queue> p(new bounded_event_queue(srv,size,limit));
return p;
}
///
/// message_send_limit defines a buffer limit for a single send operation such that
/// when a user reads very long queue from the beginning he would not cause delays in the event loop.
///
void message_send_limit(size_t n)
{
limit_ = n;
}

///
/// Enqueue a new event of default "message" type
///
/// If \a send is false the messages are not dispatched
/// immediately, you can dispatch them later by calling broadcast() or by calling enqueue
/// event with send=true
///
void enqueue(std::string const &data,bool send=true)
{
enqueue(std::string(),data,send);
}
///
/// Enqueue a new event of type \a event
///
/// If \a send is false the messages are not dispatched
/// immediately, you can dispatch them later by calling broadcast() or by calling enqueue
/// event with send=true
///
void enqueue(std::string const &event,std::string const &data,bool send=true)
{
message *msg = 0;
if(end_ >= capacity_) {
msg = &messages_[end_ % capacity_];
end_++;
start_++;
}
else {
messages_.push_back(message());
msg = &messages_.back();
end_++;
}
msg->event = event;
msg->data = data;
if(send)
broadcast();
}
protected:
bool on_sent(event_stream &es)
{
size_t last_id = es.last_integer_id();
if(last_id > end_) {
es.last_integer_id(end_);
return false;
}
if(last_id==end_)
return false;
if(last_id < start_) {
last_id = start_;
}

for(size_t id = last_id + 1,counter=0;id<=end_ && counter<limit_;id++,counter++) {
message &msg = messages_[ (id-1) % capacity_ ];
es.write(msg.data,id,msg.event);
}
return true;
}


private:

struct message {
std::string event;
std::string data;
};
size_t start_;
size_t end_;
size_t capacity_;
size_t limit_;
std::vector<message> messages_;
};



} // namespace sse


#endif

Loading…
Cancel
Save