- Asynchronous JsonRPC handling on both client and server side - Errors and timeout handlingmaster
@@ -24,6 +24,33 @@ | |||||
/// - id - JSONRPC id. It should be null for notification methods | /// - id - JSONRPC id. It should be null for notification methods | ||||
/// and it should be some integer or string for function methods | /// and it should be some integer or string for function methods | ||||
/// | /// | ||||
/// Each method given in the constructor would have following properties: | |||||
/// | |||||
/// on_error(e) - Returned error, where e.type is one of 'transport', 'protocol', 'response' and | |||||
/// e.error is the error object. | |||||
/// on_result(r) - Returned method result, or on_result() - for notifications. | |||||
/// | |||||
/// For example | |||||
/// | |||||
/// var rpc = new JsonRPC('/chat',['getValue','getStatistics'],['updateValue']); | |||||
/// | |||||
/// // Asynchronouse method | |||||
/// | |||||
/// rpc.getValue.on_error = function(e) { alert('Error:' + e.error); } | |||||
/// rpc.getValue.on_result = function(r) { alert(r); } | |||||
/// | |||||
/// rpc.getValue(); | |||||
/// | |||||
/// // Synchronous method | |||||
/// | |||||
/// // not setting callbacks or setting on_error and on_result to null | |||||
/// // makes them synchronous rpc calls. For example; | |||||
/// | |||||
/// alert(rpc.getStatistics()); | |||||
/// rpc.updateValue(10); | |||||
/// | |||||
/// | |||||
function JsonRPC(uri,function_methods,notification_methods) { | function JsonRPC(uri,function_methods,notification_methods) { | ||||
if(!(this instanceof JsonRPC)) | if(!(this instanceof JsonRPC)) | ||||
return new JsonRPC(uri,function_methods,notification_methods); | return new JsonRPC(uri,function_methods,notification_methods); | ||||
@@ -75,7 +102,13 @@ JsonRPC.prototype.syncCall = function(name,id,params) { | |||||
if(xhr.status!=200) | if(xhr.status!=200) | ||||
throw Error('Invalid response:' + xhr.status); | throw Error('Invalid response:' + xhr.status); | ||||
if(id!=null) { | if(id!=null) { | ||||
var response = JSON.parse(xhr.responseText); | |||||
var response = null; | |||||
try { | |||||
response = JSON.parse(xhr.responseText); | |||||
} | |||||
catch(e) { | |||||
throw Error('Invalid JSON-RPC response'); | |||||
} | |||||
if(response.error != null) | if(response.error != null) | ||||
throw Error(response.error); | throw Error(response.error); | ||||
return response.result; | return response.result; | ||||
@@ -95,7 +128,14 @@ JsonRPC.prototype.asyncCall = function(name,id,params,on_result,on_error) { | |||||
return; | return; | ||||
if(xhr.status==200) { | if(xhr.status==200) { | ||||
if(id!=null) { | if(id!=null) { | ||||
var response = JSON.parse(xhr.responseText); | |||||
var response = null; | |||||
try { | |||||
response = JSON.parse(xhr.responseText); | |||||
} | |||||
catch(e) { | |||||
on_error({'type' : 'protocol', 'error' : 'invalid response'}); | |||||
return; | |||||
} | |||||
if(response.error != null) { | if(response.error != null) { | ||||
on_error({'type': 'response', 'error' : response.error }); | on_error({'type': 'response', 'error' : response.error }); | ||||
} | } | ||||
@@ -41,48 +41,55 @@ public: | |||||
cppcms::rpc::json_rpc_server(srv), | cppcms::rpc::json_rpc_server(srv), | ||||
timer_(srv.get_io_service()) | timer_(srv.get_io_service()) | ||||
{ | { | ||||
// Our main methods | |||||
bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role); | bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role); | ||||
bind("get",cppcms::rpc::json_method(&chat::get,this),method_role); | bind("get",cppcms::rpc::json_method(&chat::get,this),method_role); | ||||
// Add timeouts to the system | |||||
last_wake_ = time(0); | last_wake_ = time(0); | ||||
on_timer(booster::system::error_code()); | on_timer(booster::system::error_code()); | ||||
} | } | ||||
// Handle new message call | |||||
void post(std::string const &author,std::string const &message) | void post(std::string const &author,std::string const &message) | ||||
{ | { | ||||
cppcms::json::value obj; | cppcms::json::value obj; | ||||
obj["author"]=author; | obj["author"]=author; | ||||
obj["message"]=message; | obj["message"]=message; | ||||
messages_.push_back(obj); | messages_.push_back(obj); | ||||
last_wake_ = time(0); | |||||
broadcast(messages_.size()-1); | broadcast(messages_.size()-1); | ||||
} | } | ||||
void on_timer(booster::system::error_code const &e) | void on_timer(booster::system::error_code const &e) | ||||
{ | { | ||||
if(e) return; // cancelation | if(e) return; // cancelation | ||||
if(last_wake_ - time(0) > 10) { | |||||
// check idle connections for more then 10 seconds | |||||
if(time(0) - last_wake_ > 10) { | |||||
broadcast(messages_.size()); | broadcast(messages_.size()); | ||||
last_wake_ = time(0); | |||||
} | } | ||||
// restart timer | |||||
timer_.expires_from_now(booster::ptime::seconds(1)); | timer_.expires_from_now(booster::ptime::seconds(1)); | ||||
timer_.async_wait(boost::bind(&chat::on_timer,booster::intrusive_ptr<chat>(this),_1)); | timer_.async_wait(boost::bind(&chat::on_timer,booster::intrusive_ptr<chat>(this),_1)); | ||||
std::cout << "Status: \n" | |||||
<< "Waiters: " << waiters_.size() << '\n' | |||||
<< "Messages:" << messages_.size() <<'\n' | |||||
<< "["; | |||||
for(size_t i=0;i<messages_.size();i++) | |||||
std::cout << messages_[i] << std::endl; | |||||
std::cout <<"]" << std::endl; | |||||
} | } | ||||
// Handle request | |||||
void get(unsigned from) | void get(unsigned from) | ||||
{ | { | ||||
if(from < messages_.size()) { | if(from < messages_.size()) { | ||||
// not long polling - return result now | |||||
return_result(make_response(from)); | return_result(make_response(from)); | ||||
return; | return; | ||||
} | } | ||||
else if(from == messages_.size()) { | else if(from == messages_.size()) { | ||||
// Can't answer now | |||||
// Add long polling request to the list | |||||
booster::shared_ptr<cppcms::rpc::json_call> call=release_call(); | booster::shared_ptr<cppcms::rpc::json_call> call=release_call(); | ||||
waiters_.insert(call); | waiters_.insert(call); | ||||
// set disconnect callback | |||||
call->context().async_on_peer_reset( | call->context().async_on_peer_reset( | ||||
boost::bind( | boost::bind( | ||||
&chat::remove_context, | &chat::remove_context, | ||||
@@ -93,36 +100,53 @@ public: | |||||
return_error("Invalid position"); | return_error("Invalid position"); | ||||
} | } | ||||
} | } | ||||
// handle client disconnect | |||||
void remove_context(booster::shared_ptr<cppcms::rpc::json_call> call) | void remove_context(booster::shared_ptr<cppcms::rpc::json_call> call) | ||||
{ | { | ||||
waiters_.erase(call); | waiters_.erase(call); | ||||
} | } | ||||
void broadcast(size_t from) | void broadcast(size_t from) | ||||
{ | { | ||||
// update timeout | |||||
last_wake_ = time(0); | |||||
// Prepare response | |||||
cppcms::json::value response = make_response(from); | |||||
// Send it to everybody | |||||
for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) { | for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) { | ||||
booster::shared_ptr<cppcms::rpc::json_call> call = *waiter; | booster::shared_ptr<cppcms::rpc::json_call> call = *waiter; | ||||
call->return_result(make_response(from)); | |||||
call->return_result(response); | |||||
} | } | ||||
waiters_.clear(); | waiters_.clear(); | ||||
} | } | ||||
// Prepare response to the client | |||||
cppcms::json::value make_response(size_t n) | cppcms::json::value make_response(size_t n) | ||||
{ | { | ||||
cppcms::json::value v; | cppcms::json::value v; | ||||
// Small optimization | |||||
v=cppcms::json::array(); | v=cppcms::json::array(); | ||||
cppcms::json::array &ar = v.array(); | cppcms::json::array &ar = v.array(); | ||||
ar.reserve(messages_.size() - n); | ar.reserve(messages_.size() - n); | ||||
// prepare all messages | |||||
for(size_t i=n;i<messages_.size();i++) { | for(size_t i=n;i<messages_.size();i++) { | ||||
ar.push_back(messages_[i]); | ar.push_back(messages_[i]); | ||||
} | } | ||||
std::cout << "Response to client:" << v << std::endl; | |||||
return v; | return v; | ||||
} | } | ||||
private: | private: | ||||
// message store | |||||
std::vector<cppcms::json::value> messages_; | std::vector<cppcms::json::value> messages_; | ||||
// long poll requests | |||||
typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type; | typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type; | ||||
waiters_type waiters_; | waiters_type waiters_; | ||||
// timer for resetting idle requests | |||||
booster::aio::deadline_timer timer_; | booster::aio::deadline_timer timer_; | ||||
time_t last_wake_; | time_t last_wake_; | ||||
}; | }; | ||||
@@ -8,8 +8,16 @@ | |||||
<title>Chat Room</title> | <title>Chat Room</title> | ||||
<script type="text/javascript"> | <script type="text/javascript"> | ||||
message_count = 0; | |||||
// Global values: | |||||
// RPC object with two methods: | |||||
// | |||||
// get(counter), returns array of objects with properties author and message | |||||
// post(author,message) posts new chat message | |||||
// | |||||
rpc = new JsonRPC('/chat',['get'],['post']); | rpc = new JsonRPC('/chat',['get'],['post']); | ||||
// Messages counter - where to get new messages from, parameter for rpc.get | |||||
message_count = 0; | |||||
function make_error(what,e) | function make_error(what,e) | ||||
{ | { | ||||
@@ -25,7 +33,6 @@ | |||||
'<dd>' + m.message + '</dd>'; | '<dd>' + m.message + '</dd>'; | ||||
message_count++; | message_count++; | ||||
} | } | ||||
//messagesHtml.innerHTML += '<p>JSON: ' + JSON.stringify(messages) + '</p>' ; | |||||
restart(); | restart(); | ||||
} | } | ||||
@@ -35,25 +42,29 @@ | |||||
document.getElementById('reconnect').disabled = false; | document.getElementById('reconnect').disabled = false; | ||||
} | } | ||||
rpc.post.on_error = function(e) { | |||||
make_error('Posting New Messages',e); | |||||
} | |||||
rpc.post.on_result = function() { | rpc.post.on_result = function() { | ||||
// reset the form content | |||||
document.getElementById("message").value = ''; | document.getElementById("message").value = ''; | ||||
} | } | ||||
rpc.post.on_error = function(e) { | |||||
make_error('Posting New Messages',e); | |||||
} | |||||
function restart() | function restart() | ||||
{ | { | ||||
rpc.get(message_count); | rpc.get(message_count); | ||||
} | } | ||||
function reconnect_to_server() | function reconnect_to_server() | ||||
{ | { | ||||
message_count = 0; | message_count = 0; | ||||
document.getElementById('error_message').innerHTML = ''; | document.getElementById('error_message').innerHTML = ''; | ||||
document.getElementById('messages').innerHTML = ''; | |||||
document.getElementById('reconnect').disabled = true; | |||||
restart(); | restart(); | ||||
return false; | return false; | ||||
} | } | ||||
function send_data() { | function send_data() { | ||||
author = document.getElementById('author').value; | author = document.getElementById('author').value; | ||||
message = document.getElementById("message").value; | message = document.getElementById("message").value; |