Line data Source code
1 : //# Registrar.cc: maintain registry of services
2 : //# Copyright (C) 2017
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU Library General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 : //# License for more details.
14 : //#
15 : //# You should have received a copy of the GNU Library General Public License
16 : //# along with this library; if not, write to the Free Software Foundation,
17 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning AIPS++ should be addressed as follows:
20 : //# Internet email: aips2-request@nrao.edu.
21 : //# Postal address: AIPS++ Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : #include <time.h>
27 : #include <stdlib.h>
28 : #include <map>
29 : #include <algorithm>
30 : #include <casatools/Proc/Registrar.h>
31 : #ifdef USE_GRPC
32 : #include <grpc++/grpc++.h>
33 : #include "registrar.grpc.pb.h"
34 : #include "shutdown.grpc.pb.h"
35 : using grpc::Server;
36 : using grpc::ServerBuilder;
37 : using grpc::ServerContext;
38 : using grpc::Status;
39 : #endif
40 :
41 : using std::find_if;
42 :
43 : namespace casatools { /** namespace for CASAtools classes within "CASA code" **/
44 :
45 : #ifdef USE_GRPC
46 :
47 : // generated stubs/base-classes are in casatools::rpc namespace...
48 : class grpcRegistrar final : public rpc::Registrar::Service {
49 :
50 0 : Status add( ServerContext* context, const rpc::ServiceId* req, rpc::ServiceId* reply) override {
51 0 : std::lock_guard<std::mutex> guard(registry_mutex);
52 0 : if ( registry == 0 || req->id( ).size( ) == 0 ||
53 0 : req->types( ).size( ) == 0 || req->uri( ).size( ) == 0 )
54 0 : return Status::CANCELLED;
55 : // ServiceId actual = registry->add(ServiceId(req->id( ),req->uri( ),std::list<std::string>(req->types( ).begin( ),req->types( ).end( ))));
56 0 : std::list<std::string> service_types(req->types( ).begin( ),req->types( ).end( ));
57 0 : ServiceId actual = registry->add(ServiceId(req->id( ),req->uri( ),service_types));
58 0 : reply->set_id(actual.id( ));
59 0 : reply->set_uri(actual.uri( ));
60 : std::for_each( service_types.begin( ), service_types.end( ),
61 0 : [&] (const std::string &s) { reply->add_types(s); } );
62 : // ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
63 : // -bash-4.2$ GRPC_DEBUG=1 /opt/rh/rh-python36/root/usr/bin/python3
64 : // Python 3.6.3 (default, Jan 9 2018, 10:19:07)
65 : // [GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
66 : // Type "help", "copyright", "credits" or "license" for more information.
67 : // >>> from casatools import ctsys
68 : // registry available at 0.0.0.0:39189
69 : // >>> <1> image-view
70 : // <1> interactive-clean
71 : // <2> image-view
72 : // <2> interactive-clean
73 : // <2> ??
74 : // <2> image-view
75 : // <2> interactive-clean
76 : // <2> ??
77 : // <2> image-view
78 : // <2> interactive-clean
79 : // <2> ??
80 : // <2> image-view
81 : // <2> interactive-clean
82 : // <2> ??
83 : // <2> image-view
84 : // <2> interactive-clean
85 : // <2> ??
86 : // <2> image-view
87 : // <2> interactive-clean
88 : // <2> ??
89 : // <2> image-view
90 : // <2> interactive-clean
91 : // <2> ??
92 : // -bash-4.2$
93 : // ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
94 : // std::for_each( actual.types( ).begin( ), actual.types( ).end( ),
95 : // [&] (const std::string &s) {
96 : // fprintf( stderr, "<2>\t\t%s\n", s.c_str( ) );
97 : // fflush(stderr);
98 : // if ( ++count > 20 ) exit(1);
99 : // reply->add_types(s);
100 : // } );
101 0 : return Status::OK;
102 : }
103 :
104 0 : Status remove( ServerContext* context, const rpc::ServiceId* req, ::google::protobuf::BoolValue* reply) override {
105 0 : std::lock_guard<std::mutex> guard(registry_mutex);
106 0 : if ( registry == 0 || req->id( ).size( ) == 0 )
107 0 : return Status::CANCELLED;
108 0 : reply->set_value(registry->remove(req->id( )));
109 0 : return Status::OK;
110 : }
111 :
112 0 : Status services( ServerContext* context, const ::google::protobuf::Empty* req, rpc::ServiceIds* reply) override {
113 0 : std::list<ServiceId> services;
114 :
115 : {
116 0 : std::lock_guard<std::mutex> guard(registry_mutex);
117 0 : if ( registry == 0 )
118 0 : return Status::CANCELLED;
119 0 : services = registry->services( );
120 : }
121 :
122 0 : for ( std::list<ServiceId>::const_iterator ci = services.begin( ); ci != services.end( ); ++ci ) {
123 0 : rpc::ServiceId *serv = reply->add_service( );
124 0 : serv->set_id(ci->id( ));
125 0 : serv->set_uri(ci->uri( ));
126 0 : std::for_each( ci->types( ).begin( ), ci->types( ).end( ), [&] (const std::string &s) { serv->add_types(s); } );
127 : }
128 :
129 0 : return Status::OK;
130 : }
131 :
132 : std::mutex registry_mutex;
133 : Registrar *registry;
134 :
135 : public:
136 1 : grpcRegistrar( Registrar *r ) : registry(r) { }
137 :
138 : };
139 :
140 : struct grpcState {
141 : std::unique_ptr<Server> server;
142 : std::unique_ptr<grpcRegistrar> reg;
143 1 : ~grpcState( ) {
144 1 : if (getenv("GRPC_DEBUG")) {
145 0 : fprintf(stderr, "stopping registry\n");
146 0 : fflush(stderr);
147 : }
148 1 : if ( server ) server->Shutdown( );
149 1 : }
150 : };
151 : #endif
152 :
153 :
154 1 : Registrar::Registrar( ) {
155 1 : srand(time(0));
156 : #ifdef USE_GRPC
157 1 : grpc_state = 0;
158 1 : grpcState *state = new grpcState;
159 1 : state->reg.reset(new grpcRegistrar(this));
160 2 : ServerBuilder builder;
161 : char address_buf[100];
162 1 : constexpr char address_template[] = "0.0.0.0:%d";
163 1 : snprintf(address_buf,sizeof(address_buf),address_template,0);
164 2 : std::string server_address(address_buf);
165 1 : int selected_port = 0;
166 :
167 : // Listen on the given address without any authentication mechanism.
168 1 : builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &selected_port);
169 : // Register "service" as the instance through which we'll communicate with
170 : // clients. In this case it corresponds to an *synchronous* service.
171 1 : builder.RegisterService(state->reg.get( ));
172 : // Finally assemble the server.
173 1 : state->server = builder.BuildAndStart( );
174 1 : if ( selected_port > 0 ) {
175 : // if an available port can be found, selected_port is set to a value greater than zero
176 1 : snprintf(address_buf,sizeof(address_buf),address_template,selected_port);
177 1 : uri_ = address_buf;
178 1 : if (getenv("GRPC_DEBUG")) std::cerr << "registry available at " << uri_ << std::endl;
179 1 : grpc_state = (void*) state;
180 0 : } else delete state;
181 :
182 : #endif
183 1 : }
184 :
185 1 : Registrar::~Registrar( ) {
186 : #ifdef USE_GRPC
187 1 : for ( auto si = service_list.cbegin( ); si != service_list.cend( ); ++si ) {
188 0 : auto tl = si->types( );
189 0 : for ( auto ti = tl.begin( ); ti != tl.end( ); ++ti ) {
190 0 : if ( *ti == "shutdown" ) {
191 0 : auto uri = si->uri( );
192 0 : if (getenv("GRPC_DEBUG")) {
193 : std::cerr << " ...sending shutdown notification to " <<
194 0 : si->id( ) <<
195 0 : " (" << uri << ")" << std::endl;
196 0 : fflush(stderr);
197 : }
198 0 : grpc::ClientContext context;
199 : std::unique_ptr<casatools::rpc::Shutdown::Stub> proxy =
200 0 : casatools::rpc::Shutdown::NewStub(grpc::CreateChannel(uri, grpc::InsecureChannelCredentials( )));
201 0 : ::google::protobuf::Empty req;
202 0 : ::google::protobuf::Empty resp;
203 0 : proxy->now( &context, req, &resp );
204 0 : break;
205 : }
206 : }
207 : }
208 1 : if ( grpc_state ) delete (grpcState*) grpc_state;
209 : #endif
210 1 : }
211 :
212 0 : bool Registrar::remove( std::string id ) {
213 0 : std::lock_guard<std::mutex> guard(service_list_mutex);
214 0 : auto search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &sid) { return sid == id; });
215 0 : if ( search == service_list.end( ) )
216 0 : return false;
217 0 : service_list.erase(search);
218 0 : return true;
219 : }
220 :
221 0 : ServiceId Registrar::add( const ServiceId &proposed ) {
222 0 : const int maxindex = 65535;
223 : char buf[8];
224 0 : sprintf( buf, ":%04x", rand( ) % maxindex + 1 );
225 0 : std::lock_guard<std::mutex> guard(service_list_mutex);
226 0 : auto search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &id) { return id == (proposed.id() + buf); });
227 0 : while ( search != service_list.end( ) ) {
228 0 : sprintf( buf, ":%04x", rand( ) % maxindex + 1 );
229 0 : search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &id) { return id == (proposed.id() + buf); });
230 : }
231 0 : ServiceId result(proposed.id() + buf, proposed.uri(), proposed.types() );
232 0 : service_list.push_back(result);
233 0 : return result;
234 : }
235 :
236 : }
|