diff --git a/.gitignore b/.gitignore index 92dda71..7189714 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ Gemfile.lock tmp .env pkg +*.o +*.so +*.gem +Makefile diff --git a/README.md b/README.md index 99a4804..8dc2894 100644 --- a/README.md +++ b/README.md @@ -13,16 +13,148 @@ This here is a [Ruby][ruby] wrapper for the official C++ implementation of [Cap' First [install libcapnp][libcapnp-install], then install the gem: ```bash -gem install capn_proto --pre +gem install capn_proto-rpc --pre ``` -The native extension for this gem requires a C++ compiler with C++11 features, so use the same C++ compiler and flags that you used to compile libcapnp (e.g. `CXX` and `CXXFLAGS`). As an OSX user, having followed the [instructions for installing libcapnp on OSX][libcapnp-install], the correct incantation is as follows: +or add this to your Gemfile + ```bash -CXX=$HOME/clang-3.2/bin/clang++ gem install capn_proto --pre +gem capn_proto-rpc +``` + +The native extension for this gem requires a C++ compiler with C++11 features. +I've hardcoded compiler flags directly on the makefile in order to make the install easier. + +# RPC Client Example +note: the schema file, client example and the server example can be found in lib/tests as a minitest. + +The following examples uses this schema: + +```CapnProto +# file hidraCordatus.capnp + +struct Task { + dataint @0 :Int32; + madeBy @1 :Text; +} + +interface Employer { + getWorker @0 () -> ( worker :Worker ); +} + +interface Worker { + put23 @0 (taskToProcess :Task) -> (taskProcessed :Task); +} + +``` +Load all the schemas and methods that will be used then create an EzRpcClient and from it get our client. +```ruby +require 'capn_proto' + +module Hydra extend CapnProto::SchemaLoader + load_schema('./tests/hidraCordatus.capnp') +end + +employer_schema = Hydra::Employer.schema +get_worker_method = Hydra::Employer.method! 'getWorker' +put23method = Hydra::Worker.method! 'put23' + +ezclient = CapnProto::EzRpcClient.new("127.0.0.1:1337",employer_schema) +client = ezclient.client + +``` +Create a request of the method "getWorker" who is in the variable get_worker_method above. +Then, send it storing the pipelined request. +```ruby +request = client.request(get_worker_method) +pipelinedRequest = request.send +``` +get the returned "worker" set the method that we want to request on it and then set +the parameters to be requested, in this case we set dataint to 0. + +```ruby +pipelinedRequest.get('worker').method = put23method +pipelinedRequest.taskToProcess.dataint(0) +``` +now we wait for the results (note that this is the only line that blocks). while we are waiting +the Global interpreter lock is released so we can run ruby code on other threads. +Also note that we use ezclient as a waitscope. +```ruby +results = pipelinedRequest.send.wait(ezclient) +puts results.taskProcessed.dataint +puts results.taskProcessed.madeBy +``` + +# RPC server Example +```ruby +require 'capn_proto' + +module Hydra extend CapnProto::SchemaLoader + load_schema('./tests/hidraCordatus.capnp') +end + +class WorkerServer < CapnProto::CapabilityServer + def initialize(i) + @madeBy = "made by worker ##{i}" + super(Hydra::Worker.schema) + end + + def put23(context) + n = context.getParams.taskToProcess.dataint + context.getResults.taskProcessed.dataint = n + 23 + context.getResults.taskProcessed.madeBy = @madeBy + end +end + +class EmployerServer < CapnProto::CapabilityServer + def initialize(wp) + @worker_pool = wp + @currentWorker = 0 + super(Hydra::Employer.schema) + end + + def get_a_Worker + @currentWorker += 1 + @worker_pool[@currentWorker % @worker_pool.size] + end + + def getWorker(context) + context.getResults.worker = get_a_Worker + end +end + ``` +note that the name of the methods is exactly the same as the name of the function that is defined on the schema and recieves only one argument. This argument is a callContext, you can use the method **getParams** to get the parameters passed to the called method or +use **getResults** to set the results of the request. +regarding to the example, EmployerServer will serve WorkerServers to the clients. -# Example +```ruby +workers = [] +10.times do |i| + workers << WorkerServer.new(i) +end + + +e = CapnProto::EzRpcServer.new(EmployerServer.new(workers), "*:1337") +puts "serving EmployerServer on 1337..." +e.run + +``` +create ten workers, then a EzRpcServer wich binds to port 1337. Then run it. +``` +the results of running the server/client pair is : + +23 +"made by worker #1" +23 +"made by worker #2" +23 +"made by worker #3" +23 +... +``` +# Structs Example ```ruby require 'capn_proto' @@ -99,13 +231,18 @@ What's implemented: - Message writing - To byte string - To file descriptor +- RPC + - loading InterfaceSchema and their methods + - RPC client + - RPC server What's to come: - More reading/writing mechanisms: - Packing/unpacking - Extensive test coverage - Proper support for [JRuby][jruby] -- Support for RPC +- There is a known bug where the servers don't exits when pressing control-c. It only exits after +pressing control-c and then a request is made from a client. [logo]: https://raw.github.com/cstrahan/capnp-ruby/master/media/captain_proto_small.png "Cap'n Proto" [ruby]: http://www.ruby-lang.org/ "Ruby" diff --git a/capn_proto.gemspec b/capn_proto.gemspec index d17fb63..c81e44b 100644 --- a/capn_proto.gemspec +++ b/capn_proto.gemspec @@ -1,10 +1,11 @@ require File.expand_path('../lib/capn_proto/version', __FILE__) Gem::Specification.new do |s| - s.name = 'capn_proto' + s.name = 'capn_proto-rpc' s.license = 'MIT' s.version = CapnProto::VERSION + s.required_ruby_version = '>= 2.0' s.summary = "Cap'n Proto (libcapnp) bindings for Ruby." @@ -14,19 +15,20 @@ Gem::Specification.new do |s| "From the Cap'n Proto documentation: " \ "\"Cap'n Proto is an insanely fast data interchange format and " \ "capability-based RPC system. Think JSON, except binary. " \ - "Or think Protocol Buffers, except faster.\"" + "Or think Protocol Buffers, except faster.\" " \ + "This is a extended version of the original gem Capnproto which adds RPC support visit the homepage to view usage" - s.homepage = 'https://github.com/cstrahan/capnp-ruby' + s.homepage = 'https://github.com/nemoNoboru/capnp-ruby' - s.authors = ['Charles Strahan'] - s.email = ['charles.c.strahan@gmail.com'] + s.authors = ['Charles Strahan','Felipe Vieira'] + s.email = ['charles.c.strahan@gmail.com','felipetavres@gmail.com'] s.add_development_dependency 'rspec', '2.14.1' - s.add_development_dependency 'rake' + s.add_development_dependency 'rake' , '~> 0' s.add_development_dependency 'rake-compiler', '0.7.6' - s.add_development_dependency 'awesome_print' - s.add_development_dependency 'interactive_editor' + s.add_development_dependency 'awesome_print', '~> 0' + s.add_development_dependency 'interactive_editor', '~> 0' s.extensions = ['ext/capn_proto/extconf.rb'] s.require_paths = ['lib'] diff --git a/ext/capn_proto/EzRpc_client.cc b/ext/capn_proto/EzRpc_client.cc new file mode 100644 index 0000000..a7eeaff --- /dev/null +++ b/ext/capn_proto/EzRpc_client.cc @@ -0,0 +1,52 @@ +#include "ruby_capn_proto.h" +#include "EzRpc_client.h" +#include "capability_client.h" +#include "dynamic_capability_client.h" +#include "class_builder.h" +#include "exception.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::EzRpcClient; + VALUE EzRpcCapabilityClient::Class; + + void EzRpcCapabilityClient::Init() { + ClassBuilder("EzRpcClient", rb_cObject). + defineAlloc(&alloc). + defineMethod("client", &make_dynamic). + defineMethod("initialize" , &create). + store(&Class); + } + + void EzRpcCapabilityClient::free(WrappedType* p) { + ruby_xfree(p); + } + + VALUE EzRpcCapabilityClient::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* EzRpcCapabilityClient::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE EzRpcCapabilityClient::create(VALUE self, VALUE dir, VALUE interschema) { + + WrappedType* rb_self = unwrap(self); + new (rb_self) capnp::EzRpcClient(Util::toString(dir)); + + //store the InterfaceSchema + rb_iv_set(self,"schema",interschema); + + return self; + } + + VALUE EzRpcCapabilityClient::make_dynamic(VALUE self){ + VALUE rb_schema = rb_iv_get(self,"schema"); + VALUE rb_cap = CapabilityClient::create(unwrap(self)->getMain()); + return DynamicCapabilityClient::create(rb_cap,rb_schema); + } + +} diff --git a/ext/capn_proto/EzRpc_client.h b/ext/capn_proto/EzRpc_client.h new file mode 100644 index 0000000..611824f --- /dev/null +++ b/ext/capn_proto/EzRpc_client.h @@ -0,0 +1,21 @@ +#ifndef EZRPC_CAPABILITY_CLIENT_H +#define EXRPC_CAPABILITY_CLIENT_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class EzRpcCapabilityClient { + public: + using WrappedType = capnp::EzRpcClient; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(VALUE self, VALUE dir, VALUE schema); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + static VALUE make_dynamic(VALUE self); + static VALUE Class; + }; +} + + +#endif /* EZRPC_CAPABILITY_CLIENT_H */ diff --git a/ext/capn_proto/EzRpc_server.cc b/ext/capn_proto/EzRpc_server.cc new file mode 100644 index 0000000..1f9ca8e --- /dev/null +++ b/ext/capn_proto/EzRpc_server.cc @@ -0,0 +1,79 @@ +#include "ruby_capn_proto.h" +#include "EzRpc_server.h" +#include "ruby_capability_server.h" +#include "interface_schema.h" +#include "exception.h" +#include "class_builder.h" +#include +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::EzRpcServer; + VALUE EzRpcCapabilityServer::Class; + + void EzRpcCapabilityServer::Init() { + ClassBuilder("EzRpcServer", rb_cObject). + defineAlloc(&alloc). + defineMethod("run" , &process). + defineMethod("initialize" , &create). + store(&Class); + } + + void EzRpcCapabilityServer::free(WrappedType* p) { + ruby_xfree(p); + } + + VALUE EzRpcCapabilityServer::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* EzRpcCapabilityServer::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE EzRpcCapabilityServer::create(VALUE self, VALUE rb_capServer, VALUE dir) { + + VALUE interschema = rb_iv_get(rb_capServer,"@schema"); + auto schema = InterfaceSchema::unwrap(interschema); + + WrappedType* rb_self = unwrap(self); + new (rb_self) capnp::EzRpcServer( kj::heap(*schema, rb_capServer) , Util::toString(dir) ); + + return self; + } + + VALUE EzRpcCapabilityServer::process(VALUE self){ + try{ + loopCall l; + auto server = unwrap(self); + auto to_fulfill = kj::heap>(kj::newPromiseAndFulfiller()); + l.waitscope = &server->getWaitScope(); + l.promisepair = to_fulfill.get(); + rb_thread_call_without_gvl(loopServer, &l, stopLoopServer , l.promisepair); + }catch ( kj::Exception t ){ + Exception::raise(t); + } + return Qtrue; + } + + void * EzRpcCapabilityServer::loopServer(void * p){ + try { + auto* loopcall = (loopCall*) p; + loopcall->promisepair->promise.wait(*(loopcall->waitscope)); + }catch( kj::Exception t ){ + //adquire the lock to raise a ruby exception + rb_thread_call_with_gvl(&Exception::raise,&t); + } + } + + void EzRpcCapabilityServer::stopLoopServer(void *p){ + try { + auto* promisefulfiller = (kj::PromiseFulfillerPair*) p; + promisefulfiller->fulfiller->fulfill(); + }catch( kj::Exception t ){ + Exception::raise(t); + } + } +} diff --git a/ext/capn_proto/EzRpc_server.h b/ext/capn_proto/EzRpc_server.h new file mode 100644 index 0000000..671c89d --- /dev/null +++ b/ext/capn_proto/EzRpc_server.h @@ -0,0 +1,32 @@ +#ifndef EZRPC_CAPABILITY_SERVER_H +#define EZRPC_CAPABILITY_SERVER_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + + typedef struct _loopCall{ + kj::WaitScope* waitscope; + kj::PromiseFulfillerPair* promisepair; + } loopCall; + + class EzRpcCapabilityServer { + public: + using WrappedType = capnp::EzRpcServer; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(VALUE self, VALUE rb_server, VALUE dir); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + static VALUE process(VALUE self); + static void * loopServer(void* p); + static void stopLoopServer(void* p); + // schema and rb_server to make new RubyCapabilityServer on the fly + static capnp::InterfaceSchema* schema(VALUE self); + static VALUE rb_server(VALUE self); + static VALUE Class; + }; +} + + +#endif /* EZRPC_CAPABILITY_SERVER_H */ diff --git a/ext/capn_proto/call_context.cc b/ext/capn_proto/call_context.cc new file mode 100644 index 0000000..92c48f5 --- /dev/null +++ b/ext/capn_proto/call_context.cc @@ -0,0 +1,67 @@ +#include "ruby_capn_proto.h" +#include "call_context.h" +#include "dynamic_struct_reader.h" +#include "dynamic_struct_builder.h" +#include "class_builder.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::CallContext; + VALUE CallContext::Class; + + void CallContext::Init() { + ClassBuilder("CallContext" , rb_cObject). + defineAlloc(&alloc). + defineMethod("getParams" , &getParams). + defineMethod("releaseParams" , &releaseParams). + defineMethod("getResults" , &getResults). + defineMethod("initResults" , &initResults). + defineMethod("setResults" , &setResults). + store(&Class); + } + + void CallContext::free(WrappedType* p) { + p->~CallContext(); + ruby_xfree(p); + } + + VALUE CallContext::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* CallContext::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE CallContext::create(WrappedType context) { + VALUE rb_obj = alloc(Class); + WrappedType* wrapped_context = unwrap(rb_obj); + *wrapped_context = kj::mv(context); + return rb_obj; + } + + VALUE CallContext::getParams(VALUE self) { + return DynamicStructReader::create(unwrap(self)->getParams(),Qnil); + } + + VALUE CallContext::releaseParams(VALUE self) { + unwrap(self)->releaseParams(); + return Qtrue; + } + + VALUE CallContext::getResults(VALUE self) { + return DynamicStructBuilder::create(unwrap(self)->getResults(), Qnil, Qtrue); + } + + VALUE CallContext::initResults(VALUE self) { + return DynamicStructBuilder::create(unwrap(self)->initResults(), Qnil, Qtrue ); + } + + VALUE CallContext::setResults(VALUE self, VALUE structReader) { + // maybe check the type of structReader? + unwrap(self)->setResults( *DynamicStructReader::unwrap(structReader) ); + return Qtrue; + } +} diff --git a/ext/capn_proto/call_context.h b/ext/capn_proto/call_context.h new file mode 100644 index 0000000..773dad3 --- /dev/null +++ b/ext/capn_proto/call_context.h @@ -0,0 +1,25 @@ +#ifndef CONTEXT_H +#define CONTEXT_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class CallContext { + public: + using WrappedType = capnp::CallContext; + static void Init(); + static VALUE alloc(VALUE klass); + static void free(WrappedType* p); + static VALUE create(WrappedType context); + static WrappedType* unwrap(VALUE self); + static VALUE getParams(VALUE self); + static VALUE releaseParams(VALUE self); + static VALUE getResults(VALUE self); + static VALUE initResults(VALUE self); + static VALUE setResults(VALUE self, VALUE structBuilder); + + static VALUE Class; + }; +} + +#endif /* CONTEXT_H */ diff --git a/ext/capn_proto/capability_client.cc b/ext/capn_proto/capability_client.cc new file mode 100644 index 0000000..64e64f2 --- /dev/null +++ b/ext/capn_proto/capability_client.cc @@ -0,0 +1,47 @@ +#include "ruby_capn_proto.h" +#include "capability_client.h" +#include "dynamic_capability_client.h" +#include "exception.h" +#include "class_builder.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::Capability::Client; + VALUE CapabilityClient::Class; + + void CapabilityClient::Init() { + ClassBuilder("CapabilityClient", rb_cObject). + defineAlloc(&alloc). + defineMethod("to_dynamic" , &to_dynamic). + store(&Class); + } + + void CapabilityClient::free(WrappedType* p) { + p->~Client(); + ruby_xfree(p); + } + + VALUE CapabilityClient::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* CapabilityClient::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE CapabilityClient::create(WrappedType native_client ) { + + VALUE self = alloc(Class); + WrappedType* rb_self = unwrap(self); + new (rb_self) capnp::Capability::Client(native_client); + + return self; + } + + VALUE CapabilityClient::to_dynamic(VALUE self, VALUE schema){ + return DynamicCapabilityClient::create(self,schema); + } + +} diff --git a/ext/capn_proto/capability_client.h b/ext/capn_proto/capability_client.h new file mode 100644 index 0000000..6207520 --- /dev/null +++ b/ext/capn_proto/capability_client.h @@ -0,0 +1,21 @@ +#ifndef CAPABILITY_CLIENT_H +#define CAPABILITY_CLIENT_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class CapabilityClient { + public: + using WrappedType = capnp::Capability::Client; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(WrappedType native_client); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + static VALUE to_dynamic(VALUE self, VALUE schema); + static VALUE Class; + }; +} + + +#endif /* CAPABILITY_CLIENT_H */ diff --git a/ext/capn_proto/dynamic_capability_client.cc b/ext/capn_proto/dynamic_capability_client.cc new file mode 100644 index 0000000..3a10e17 --- /dev/null +++ b/ext/capn_proto/dynamic_capability_client.cc @@ -0,0 +1,73 @@ +#include "ruby_capn_proto.h" +#include "capability_client.h" +#include "dynamic_capability_client.h" +#include "interface_schema.h" +#include "interface_method.h" +#include "remote_promise.h" +#include "exception.h" +#include "class_builder.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::DynamicCapability::Client; + VALUE DynamicCapabilityClient::Class; + + void DynamicCapabilityClient::Init() { + ClassBuilder("DynamicCapabilityClient", rb_cObject). + defineAlloc(&alloc). + defineMethod("request_and_send" , &request_and_send). + store(&Class); + } + + void DynamicCapabilityClient::free(WrappedType* p) { + p->~Client(); + ruby_xfree(p); + } + + VALUE DynamicCapabilityClient::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* DynamicCapabilityClient::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE DynamicCapabilityClient::create(VALUE client, VALUE interschema) { + + VALUE self = alloc(Class); + WrappedType* rb_self = unwrap(self); + auto dyncap = CapabilityClient::unwrap(client)->castAs(*InterfaceSchema::unwrap(interschema)); + new (rb_self) capnp::DynamicCapability::Client(dyncap); + + return self; + } + + VALUE DynamicCapabilityClient::create(WrappedType native_client){ + VALUE self = alloc(Class); + WrappedType* rb_self = unwrap(self); + new (rb_self) capnp::DynamicCapability::Client(native_client); + + return self; + } + + VALUE DynamicCapabilityClient::request_and_send(VALUE self , VALUE rb_method , VALUE arrays){ + // have, a method and a list of lists each list containing a value to set + // return, a remote promise. + // Data must be a array of arrays + // arrays must be like ['expression','literal','3'] + // this will set in the expression param literal = 3 + try{ + auto* method = InterfaceMethod::unwrap(rb_method); + auto request = unwrap(self)->newRequest(*method); + + RemotePromise::setParam(&request,arrays); + + capnp::RemotePromise r = request.send(); + return RemotePromise::create(r); + }catch(kj::Exception e){ + Exception::raise(e); + } + } +} diff --git a/ext/capn_proto/dynamic_capability_client.h b/ext/capn_proto/dynamic_capability_client.h new file mode 100644 index 0000000..6620e84 --- /dev/null +++ b/ext/capn_proto/dynamic_capability_client.h @@ -0,0 +1,22 @@ +#ifndef DYNAMIC_CAPABILITY_CLIENT_H +#define DYNAMIC_CAPABILITY_CLIENT_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class DynamicCapabilityClient { + public: + using WrappedType = capnp::DynamicCapability::Client; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(VALUE capclient, VALUE schema); + static VALUE create(WrappedType native_dynclient); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + static VALUE request_and_send(VALUE self, VALUE method, VALUE data); + static VALUE Class; + }; +} + + +#endif /* DYNAMIC_CAPABILITY_CLIENT_H */ diff --git a/ext/capn_proto/dynamic_struct_builder.cc b/ext/capn_proto/dynamic_struct_builder.cc index d7d857a..cb29db4 100644 --- a/ext/capn_proto/dynamic_struct_builder.cc +++ b/ext/capn_proto/dynamic_struct_builder.cc @@ -5,6 +5,9 @@ #include "dynamic_value_builder.h" #include "message_builder.h" #include "malloc_message_builder.h" +#include "interface_schema.h" +#include "ruby_capability_server.h" +#include "dynamic_capability_client.h" #include "class_builder.h" #include "exception.h" #include "util.h" @@ -125,6 +128,12 @@ namespace ruby_capn_proto { unwrap(self)->set(field, *DynamicStructReader::unwrap(rb_obj)); } else if (rb_equal(klass, DynamicStructBuilder::Class)) { unwrap(self)->set(field, (*DynamicStructBuilder::unwrap(rb_obj)).asReader()); + }else if (Qnil != rb_iv_get(rb_obj,"@schema")){ + VALUE interschema = rb_iv_get(rb_obj,"@schema"); + auto c_schema = InterfaceSchema::unwrap(interschema); + unwrap(self)->set(field, kj::heap(*c_schema, rb_obj)); + }else if (rb_equal(klass, DynamicCapabilityClient::Class)){ + unwrap(self)->set(field, *DynamicCapabilityClient::unwrap(rb_obj)); } else { // TODO: raise "Non primitive type" } diff --git a/ext/capn_proto/dynamic_value_reader.cc b/ext/capn_proto/dynamic_value_reader.cc index 8444e7c..5e75cc6 100644 --- a/ext/capn_proto/dynamic_value_reader.cc +++ b/ext/capn_proto/dynamic_value_reader.cc @@ -3,11 +3,14 @@ #include "dynamic_list_reader.h" #include "dynamic_struct_reader.h" #include "dynamic_object_reader.h" +#include "dynamic_capability_client.h" namespace ruby_capn_proto { VALUE DynamicValueReader::to_ruby(capnp::DynamicValue::Reader value, VALUE parent) { switch (value.getType()) { + case capnp::DynamicValue::CAPABILITY: + return DynamicCapabilityClient::create(value.as()); case capnp::DynamicValue::BOOL: return value.as() ? Qtrue : Qfalse; case capnp::DynamicValue::INT: diff --git a/ext/capn_proto/exception.cc b/ext/capn_proto/exception.cc index 2907983..d881f80 100644 --- a/ext/capn_proto/exception.cc +++ b/ext/capn_proto/exception.cc @@ -27,4 +27,8 @@ namespace ruby_capn_proto { rb_exc_raise(create(exception)); return Qnil; } + + void * Exception::raise(void * exception) { + rb_exc_raise(create(*((kj::Exception*)exception))); + } } diff --git a/ext/capn_proto/exception.h b/ext/capn_proto/exception.h index 8781e66..60b1546 100644 --- a/ext/capn_proto/exception.h +++ b/ext/capn_proto/exception.h @@ -10,6 +10,7 @@ namespace ruby_capn_proto { static void Init(); static VALUE create(WrappedType exception); static VALUE raise(WrappedType exception); + static void * raise(void * exception); static VALUE Class; }; diff --git a/ext/capn_proto/extconf.rb b/ext/capn_proto/extconf.rb index 3f35273..ed36a9c 100644 --- a/ext/capn_proto/extconf.rb +++ b/ext/capn_proto/extconf.rb @@ -9,6 +9,8 @@ abort "*** A C++ library with support for C++11 features is required." end +$CXXFLAGS = ' -std=c++11 ' + CONFIG['CXX'] = ENV['CXX'] || CONFIG['CXX'] CONFIG['CXXFLAGS'] = [(ENV['CXXFLAGS'] || CONFIG['CXXFLAGS']), compiler.std_flag, @@ -23,5 +25,8 @@ $LDFLAGS += " -lcapnpc" $LDFLAGS += " -lcapnp" $LDFLAGS += " -lkj" +$LDFLAGS += " -lcapnp-rpc" +$LDFLAGS += " -lkj-async" + create_makefile('capn_proto/capn_proto') diff --git a/ext/capn_proto/init.cc b/ext/capn_proto/init.cc index 95742bd..aec08de 100644 --- a/ext/capn_proto/init.cc +++ b/ext/capn_proto/init.cc @@ -5,6 +5,15 @@ #include "schema_parser.h" #include "parsed_schema.h" #include "struct_schema.h" +#include "interface_schema.h" +#include "interface_method.h" + +#include "capability_client.h" +#include "remote_promise.h" +#include "call_context.h" +#include "EzRpc_server.h" +#include "EzRpc_client.h" +#include "dynamic_capability_client.h" #include "schema_node_reader.h" #include "nested_node_reader.h" #include "list_nested_node_reader.h" @@ -32,6 +41,15 @@ extern "C" { SchemaParser::Init(); ParsedSchema::Init(); StructSchema::Init(); + InterfaceSchema::Init(); + InterfaceMethod::Init(); + + CapabilityClient::Init(); + RemotePromise::Init(); + CallContext::Init(); + EzRpcCapabilityServer::Init(); + EzRpcCapabilityClient::Init(); + DynamicCapabilityClient::Init(); SchemaNodeReader::Init(); NestedNodeReader::Init(); ListNestedNodeReader::Init(); diff --git a/ext/capn_proto/interface_method.cc b/ext/capn_proto/interface_method.cc new file mode 100644 index 0000000..1854824 --- /dev/null +++ b/ext/capn_proto/interface_method.cc @@ -0,0 +1,38 @@ +#include "ruby_capn_proto.h" +#include "interface_method.h" +#include "class_builder.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::InterfaceSchema::Method; + VALUE InterfaceMethod::Class; + + void InterfaceMethod::Init() { + ClassBuilder("InterfaceMethod", rb_cObject). + defineAlloc(&alloc). + // no methods... + store(&Class); + } + + void InterfaceMethod::free(WrappedType* p) { + p->~Method(); + ruby_xfree(p); + } + + VALUE InterfaceMethod::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* InterfaceMethod::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE InterfaceMethod::create(WrappedType method) { + VALUE rb_obj = alloc(Class); + WrappedType* wrapped_method = unwrap(rb_obj); + *wrapped_method = kj::mv(method); + return rb_obj; + } +} diff --git a/ext/capn_proto/interface_method.h b/ext/capn_proto/interface_method.h new file mode 100644 index 0000000..428c667 --- /dev/null +++ b/ext/capn_proto/interface_method.h @@ -0,0 +1,21 @@ +#ifndef INTERFACE_METHOD_SCHEMA_H +#define INTERFACE_METHOD_SCHEMA_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class InterfaceMethod { + public: + using WrappedType = capnp::InterfaceSchema::Method; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(WrappedType method); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + // no methods... + static VALUE Class; + }; +} + + +#endif /* INTERFACE_METHOD_SCHEMA_H */ diff --git a/ext/capn_proto/interface_schema.cc b/ext/capn_proto/interface_schema.cc new file mode 100644 index 0000000..8aa49c8 --- /dev/null +++ b/ext/capn_proto/interface_schema.cc @@ -0,0 +1,51 @@ +#include "ruby_capn_proto.h" +#include "interface_schema.h" +#include "interface_method.h" +#include "class_builder.h" +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::InterfaceSchema; + VALUE InterfaceSchema::Class; + + void InterfaceSchema::Init() { + ClassBuilder("InterfaceSchema", rb_cObject). + defineAlloc(&alloc). + defineMethod("find_method_by_name" , &find_method_by_name). + store(&Class); + } + + void InterfaceSchema::free(WrappedType* p) { + p->~InterfaceSchema(); + ruby_xfree(p); + } + + VALUE InterfaceSchema::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* InterfaceSchema::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE InterfaceSchema::create(WrappedType schema, VALUE parent) { + VALUE rb_obj = alloc(Class); + WrappedType* wrapped_schema = unwrap(rb_obj); + *wrapped_schema = kj::mv(schema); + + rb_iv_set(rb_obj, "parent", parent); + + return rb_obj; + } + + VALUE InterfaceSchema::find_method_by_name(VALUE self, VALUE name){ + capnp::InterfaceSchema::Method value; + KJ_IF_MAYBE(value, unwrap(self)->findMethodByName(Util::toString(name))) { + return InterfaceMethod::create(*value); + } else { + return Qfalse; + } + } +} diff --git a/ext/capn_proto/interface_schema.h b/ext/capn_proto/interface_schema.h new file mode 100644 index 0000000..4040260 --- /dev/null +++ b/ext/capn_proto/interface_schema.h @@ -0,0 +1,26 @@ +#ifndef INTERFACE_SCHEMA_H +#define INTERFACE_SCHEMA_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + class InterfaceSchema { + public: + using WrappedType = capnp::InterfaceSchema; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(WrappedType schema, VALUE parent); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + // commented methods below don't are strictly needed. + //static VALUE getMethodByName(VALUE self, VALUE name); + //static VALUE getMethods(VALUE self); + static VALUE find_method_by_name(VALUE self, VALUE name); + + + static VALUE Class; + }; +} + + +#endif /* INTERFACE_SCHEMA_H */ diff --git a/ext/capn_proto/parsed_schema.cc b/ext/capn_proto/parsed_schema.cc index ba60a0e..816689e 100644 --- a/ext/capn_proto/parsed_schema.cc +++ b/ext/capn_proto/parsed_schema.cc @@ -2,6 +2,7 @@ #include "parsed_schema.h" #include "schema_node_reader.h" #include "struct_schema.h" +#include "interface_schema.h" #include "class_builder.h" #include "util.h" @@ -15,6 +16,7 @@ namespace ruby_capn_proto { defineMethod("get_proto", &get_proto). defineMethod("get_nested", &get_nested). defineMethod("as_struct", &as_struct). + defineMethod("as_interface", &as_interface). store(&Class); } @@ -55,4 +57,9 @@ namespace ruby_capn_proto { VALUE ParsedSchema::as_struct(VALUE self) { return StructSchema::create(unwrap(self)->asStruct(), self); } + + VALUE ParsedSchema::as_interface(VALUE self) { + return InterfaceSchema::create(unwrap(self)->asInterface(), self); + } + } diff --git a/ext/capn_proto/parsed_schema.h b/ext/capn_proto/parsed_schema.h index fdc317a..1569749 100644 --- a/ext/capn_proto/parsed_schema.h +++ b/ext/capn_proto/parsed_schema.h @@ -14,7 +14,8 @@ namespace ruby_capn_proto { static WrappedType* unwrap(VALUE self); static VALUE get_proto(VALUE self); static VALUE get_nested(VALUE self, VALUE name); - static VALUE as_struct(VALUE name); + static VALUE as_struct(VALUE self); + static VALUE as_interface(VALUE self); static VALUE Class; }; diff --git a/ext/capn_proto/remote_promise.cc b/ext/capn_proto/remote_promise.cc new file mode 100644 index 0000000..093096f --- /dev/null +++ b/ext/capn_proto/remote_promise.cc @@ -0,0 +1,116 @@ +#include "ruby_capn_proto.h" +#include "remote_promise.h" +#include "dynamic_value_builder.h" +#include "dynamic_struct_builder.h" +#include "dynamic_struct_reader.h" +#include "EzRpc_client.h" +#include "interface_method.h" +#include "exception.h" +#include "class_builder.h" +#include +#include "util.h" + +namespace ruby_capn_proto { + using WrappedType = capnp::RemotePromise; + VALUE RemotePromise::Class; + + void RemotePromise::Init() { + + ClassBuilder("RemotePromise", rb_cObject). + defineAlloc(&alloc). + defineMethod("request_and_send" , &request_and_send). + defineMethod("wait" , &wait). + store(&Class); + } + + void RemotePromise::free(WrappedType* p) { + p->~RemotePromise(); + ruby_xfree(p); + } + + VALUE RemotePromise::alloc(VALUE klass) { + return Data_Wrap_Struct(klass, NULL, free, ruby_xmalloc(sizeof(WrappedType))); + } + + WrappedType* RemotePromise::unwrap(VALUE self) { + WrappedType* p; + Data_Get_Struct(self, WrappedType, p); + return p; + } + + VALUE RemotePromise::create(WrappedType& remote_promise) { + + VALUE rb_obj = alloc(Class); + WrappedType* rb_promise = unwrap(rb_obj); + new (rb_promise) WrappedType(kj::mv(remote_promise)); + + return rb_obj; + } + + VALUE RemotePromise::request_and_send(VALUE self, VALUE name_struct, VALUE method, VALUE data){ + VALUE rb_client = rb_iv_get(self,"client"); + try{ + auto pipelinedClient = unwrap(self)->get(Util::toString(name_struct)).releaseAs(); + auto request = pipelinedClient.newRequest(*InterfaceMethod::unwrap(method)); + setParam(&request,data); + auto promise = request.send(); + VALUE new_remote_promise = create(promise); + return new_remote_promise; + }catch( kj::Exception t){ + Exception::raise(t); + } + } + + VALUE RemotePromise::wait(VALUE self, VALUE ezrpclient){ + VALUE client = ezrpclient; + + waitpacket p; + p.prom = unwrap(self); + p.client = EzRpcCapabilityClient::unwrap(client); + p.response = NULL; + + // call waitIntern releasing the GIL + rb_thread_call_without_gvl(waitIntern, &p, RUBY_UBF_IO , 0); + return DynamicStructReader::create(*p.response,Qnil); + } + + void * RemotePromise::waitIntern(void * p){ + try { + waitpacket* pkt = (waitpacket*) p; + auto& waitscope = pkt->client->getWaitScope(); + pkt->response = new capnp::Response(pkt->prom->wait(waitscope)); + }catch(kj::Exception t){ + // adquire the lock to raise an exception on ruby + rb_thread_call_with_gvl(&Exception::raise,&t); + } + } + + void RemotePromise::setParam(capnp::Request* request, VALUE arys){ + VALUE mainIter = rb_ary_pop(arys); // mainIter is now a array + while(mainIter != Qnil ){ + + VALUE val = rb_ary_pop(mainIter); // value to assign + VALUE last = rb_ary_pop(mainIter); // name of the field to assign to val + VALUE temp = rb_ary_shift(mainIter); // just to iterate + + capnp::DynamicStruct::Builder builder = *request; + + // follow the nodes indicated by the array + while( temp != Qnil && temp != last){ + try{ + builder = *DynamicStructBuilder::unwrap(DynamicValueBuilder::to_ruby(request->get(Util::toString(temp)),Qnil)); + temp = rb_ary_shift(mainIter); + }catch(kj::Exception t){ + Exception::raise(t); + } + } + + // when arrived to last node make the assignation + VALUE rb_struct = DynamicStructBuilder::create(builder,Qnil,Qfalse); + DynamicStructBuilder::set(rb_struct,last,val); + + mainIter = rb_ary_pop(arys); + } + } + +} diff --git a/ext/capn_proto/remote_promise.h b/ext/capn_proto/remote_promise.h new file mode 100644 index 0000000..2c93488 --- /dev/null +++ b/ext/capn_proto/remote_promise.h @@ -0,0 +1,30 @@ +#ifndef REMOTE_PROMISE_H +#define REMOTE_PROMISE_H + +#include "ruby_capn_proto.h" + +namespace ruby_capn_proto { + typedef struct _waitpacket { + capnp::EzRpcClient* client; + capnp::RemotePromise* prom; + capnp::Response* response; + } waitpacket; + + class RemotePromise { + public: + using WrappedType = capnp::RemotePromise; + static void Init(); + static VALUE alloc(VALUE klass); + static VALUE create(WrappedType& promise); + static void free(WrappedType* p); + static WrappedType* unwrap(VALUE self); + static VALUE request_and_send(VALUE self, VALUE struct_name, VALUE method, VALUE data); + static void setParam(capnp::Request* request, VALUE arrays); + static VALUE wait(VALUE self, VALUE ezrpclient); + static void * waitIntern(void * p); + static VALUE Class; + }; +} + + +#endif /* REMOTE_PROMISE_H */ diff --git a/ext/capn_proto/ruby_capability_server.cc b/ext/capn_proto/ruby_capability_server.cc new file mode 100644 index 0000000..7408838 --- /dev/null +++ b/ext/capn_proto/ruby_capability_server.cc @@ -0,0 +1,23 @@ +#include "ruby_capn_proto.h" +#include "call_context.h" +#include "ruby_capability_server.h" +#include +#include "util.h" + +namespace capnp { + kj::Promise RubyCapabilityServer::call(InterfaceSchema::Method method, CallContext context) { + // just a dummy c++ server that calls methods on a passed ruby server. + + callPacket packet; + packet.methodName = method.getProto().getName().cStr(); + packet.rb_context = ruby_capn_proto::CallContext::create(context); + packet.rb_server = this->server; + rb_thread_call_with_gvl(this->rbCall, &packet); + return kj::READY_NOW; + } + + void * RubyCapabilityServer::rbCall(void * p){ + auto * packet = (callPacket*) p; + rb_funcall( packet->rb_server, rb_intern( packet->methodName ), 1, packet->rb_context ); + } +} diff --git a/ext/capn_proto/ruby_capability_server.h b/ext/capn_proto/ruby_capability_server.h new file mode 100644 index 0000000..4a375f3 --- /dev/null +++ b/ext/capn_proto/ruby_capability_server.h @@ -0,0 +1,21 @@ +#include "ruby_capn_proto.h" + +namespace capnp { + // helper struct to use rb_thread_call_with_gvl + typedef struct _callPacket { + const char* methodName; + VALUE rb_context; + VALUE rb_server; + } callPacket; + + class RubyCapabilityServer : public DynamicCapability::Server { + + private: + VALUE server; + + public: + RubyCapabilityServer( InterfaceSchema interface, VALUE server ): DynamicCapability::Server(interface) { this->server = server; } + kj::Promise call( InterfaceSchema::Method method, CallContext context ); + static void * rbCall(void * p); // to be called with rb_thread_call_with_gvl + }; +} diff --git a/ext/capn_proto/ruby_capn_proto.h b/ext/capn_proto/ruby_capn_proto.h index 7c71e2b..599b3d3 100644 --- a/ext/capn_proto/ruby_capn_proto.h +++ b/ext/capn_proto/ruby_capn_proto.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -16,6 +17,8 @@ #include #include +#include + #include #ifdef __ #undef __ diff --git a/ext/capn_proto/schema_node_reader.cc b/ext/capn_proto/schema_node_reader.cc index 628a836..ca1e57f 100644 --- a/ext/capn_proto/schema_node_reader.cc +++ b/ext/capn_proto/schema_node_reader.cc @@ -12,6 +12,7 @@ namespace ruby_capn_proto { defineAlloc(&alloc). defineMethod("nested_nodes", &get_nested_nodes). defineMethod("struct?", &is_struct). + defineMethod("interface?", &is_interface). store(&Class); } @@ -48,6 +49,10 @@ namespace ruby_capn_proto { return unwrap(self)->isStruct() ? Qtrue : Qfalse; } + VALUE SchemaNodeReader::is_interface(VALUE self) { + return unwrap(self)->isInterface() ? Qtrue : Qfalse; + } + VALUE SchemaNodeReader::is_enum(VALUE self) { return unwrap(self)->isEnum() ? Qtrue : Qfalse; } diff --git a/ext/capn_proto/schema_node_reader.h b/ext/capn_proto/schema_node_reader.h index 4131584..251ac3a 100644 --- a/ext/capn_proto/schema_node_reader.h +++ b/ext/capn_proto/schema_node_reader.h @@ -14,6 +14,7 @@ namespace ruby_capn_proto { static VALUE get_nested_nodes(VALUE self); static VALUE is_struct(VALUE self); static VALUE is_enum(VALUE self); + static VALUE is_interface(VALUE self); static VALUE Class; }; diff --git a/ext/capn_proto/schema_parser.cc b/ext/capn_proto/schema_parser.cc index 4323ff5..930e356 100644 --- a/ext/capn_proto/schema_parser.cc +++ b/ext/capn_proto/schema_parser.cc @@ -1,6 +1,7 @@ #include "ruby_capn_proto.h" #include "schema_parser.h" #include "parsed_schema.h" +#include "exception.h" #include "class_builder.h" #include "util.h" @@ -39,15 +40,22 @@ namespace ruby_capn_proto { } VALUE SchemaParser::parse_disk_file(VALUE self, VALUE rb_display_name, VALUE rb_disk_path, VALUE rb_import_path) { - auto imports = Util::toStringArray(rb_import_path); - auto importsPtrs = KJ_MAP(s, imports) -> kj::StringPtr { return s; }; - - auto display_name = Util::toString(rb_display_name); - auto schema = unwrap(self)->parseDiskFile( - display_name, - StringValueCStr(rb_disk_path), - importsPtrs - ); - return ParsedSchema::create(schema, self); + try { + + auto imports = Util::toStringArray(rb_import_path); + auto importsPtrs = KJ_MAP(s, imports) -> kj::StringPtr { return s; }; + + auto display_name = Util::toString(rb_display_name); + auto schema = unwrap(self)->parseDiskFile( + display_name, + StringValueCStr(rb_disk_path), + importsPtrs + ); + return ParsedSchema::create(schema, self); + + }catch ( kj::Exception t ){ + Exception::raise(t); + return Qnil; + } } } diff --git a/lib/capn_proto.rb b/lib/capn_proto.rb index 7aee334..6b81335 100644 --- a/lib/capn_proto.rb +++ b/lib/capn_proto.rb @@ -83,6 +83,12 @@ def load_schema(file_name, imports=[]) mod.extend(Struct) end + if node.interface? + interface_schema = schema.as_interface + mod.instance_variable_set(:@schema, interface_schema) + mod.extend(Interface) + end + nested_nodes.each do |nested_node| const_name = nested_node.name const_name[0] = const_name[0].upcase @@ -93,9 +99,9 @@ def load_schema(file_name, imports=[]) end schema = @schema_parser.parse_disk_file( - display_name, - file_name, - imports); + display_name, + file_name, + imports); load_schema_rec.call(schema, self) end @@ -125,5 +131,101 @@ def read_packed_from(io) raise 'not implemented' end end + + module Interface + attr_reader :schema + + def method?(name) + @schema.find_method_by_name name + end + + def method!(name) #short and ruby friendlier alias for find_method_by_name + temp = @schema.find_method_by_name name + if temp + return temp + else + raise "Method #{name} not found in this interface" + end + end + end + end + + class RequestBuilder + attr_reader :data + + def initialize + @data = [] + @currentArray = [] + end + + def method_missing(*args) + if args.length == 1 + @currentArray << args.pop.to_s + return self # to chain methods like .expression.literal(3) + elsif args.length == 2 + @currentArray << args.shift.to_s + @currentArray << args.shift + @data << @currentArray + @currentArray = [] + else + super + end + end + + def wait(waitscope) + @to_request.wait(waitscope) + end + end + + class Request < RequestBuilder + + def initialize( client, method ) + @to_request = client + @method = method + super() + end + + def send + PipelinedRequest.new(@to_request.request_and_send(@method,@data)) + end + + end + + class PipelinedRequest < RequestBuilder + attr_accessor :method + + def initialize( remotePromise ) + @to_request = remotePromise + super() + end + + def get(value) + @value = value + return self # to chain calls like get('value').readRequest + end + + def send + if !@value || !@method + raise "call both get and set method before calling send" + end + @to_request.request_and_send(@value,@method,@data) + end end + + DynamicCapabilityClient.class_eval do + + def request(method) + Request.new(self,method) + end + + end + + class CapabilityServer + + def initialize(interface) + @schema = interface + end + + end + end diff --git a/lib/capn_proto/version.rb b/lib/capn_proto/version.rb index a167ac5..c5d6118 100644 --- a/lib/capn_proto/version.rb +++ b/lib/capn_proto/version.rb @@ -1,3 +1,3 @@ module CapnProto - VERSION = "0.0.1.alpha.8" + VERSION = "0.1.1.alpha.rpc" end diff --git a/tests/hidraCordatus.capnp b/tests/hidraCordatus.capnp new file mode 100644 index 0000000..e93fd22 --- /dev/null +++ b/tests/hidraCordatus.capnp @@ -0,0 +1,16 @@ +# made by Felipe Vieira, gsoc 2016 student + +@0x9dfdcaa625365235; + +struct Task { + dataint @0 :Int32; + madeBy @1 :Text; +} + +interface Employer { + getWorker @0 () -> ( worker :Worker ); +} + +interface Worker { + put23 @0 (taskToProcess :Task) -> (taskProcessed :Task); +} diff --git a/tests/hidraCordatusEmployer.rb b/tests/hidraCordatusEmployer.rb new file mode 100644 index 0000000..0d587ed --- /dev/null +++ b/tests/hidraCordatusEmployer.rb @@ -0,0 +1,33 @@ +require 'capn_proto' +require 'minitest/autorun' + +module Hydra extend CapnProto::SchemaLoader + load_schema('hidraCordatus.capnp') +end + +class Employer < Minitest::Test + def test_push_a_ton_of_tasks + employer_schema = Hydra::Employer.schema + get_worker_method = Hydra::Employer.method! 'getWorker' + put23method = Hydra::Worker.method! 'put23' + + ezclient = CapnProto::EzRpcClient.new("127.0.0.1:1337",employer_schema) + client = ezclient.client + + 100.times do + + #set up the request + request = client.request(get_worker_method) + pipelinedRequest = request.send + pipelinedRequest.get('worker').method = put23method + pipelinedRequest.taskToProcess.dataint(0) + + #get the results + results = pipelinedRequest.send.wait(ezclient) + p results.taskProcessed.dataint + p results.taskProcessed.madeBy + + assert results.taskProcessed.dataint == 23 + end + end +end diff --git a/tests/hidraCordatusMaster.rb b/tests/hidraCordatusMaster.rb new file mode 100644 index 0000000..8761e36 --- /dev/null +++ b/tests/hidraCordatusMaster.rb @@ -0,0 +1,51 @@ +require 'capn_proto' + +module Hydra extend CapnProto::SchemaLoader + load_schema('hidraCordatus.capnp') +end + +class WorkerServer < CapnProto::CapabilityServer + def initialize(i) + @madeBy = "made by worker ##{i}" + super(Hydra::Worker.schema) + end + + def put23(context) + puts "put23 called" + n = context.getParams.taskToProcess.dataint + context.getResults.taskProcessed.dataint = n + 23 + context.getResults.taskProcessed.madeBy = @madeBy + puts "put23 dispatched" + end +end + +class EmployerServer < CapnProto::CapabilityServer + def initialize(wp) + @worker_pool = wp + @currentWorker = 0 + super(Hydra::Employer.schema) + end + + def get_a_Worker + @currentWorker += 1 + @worker_pool[@currentWorker % @worker_pool.size] + end + + def getWorker(context) + puts "getWorker called" + context.getResults.worker = get_a_Worker + puts "getWorker dispatched" + end + +end + + +workers = [] +10.times do |i| + workers << WorkerServer.new(i) +end + + +e = CapnProto::EzRpcServer.new(EmployerServer.new(workers), "*:1337") +puts "serving EmployerServer on 1337..." +e.run