LCOV - code coverage report
Current view: top level - casatools/Proc - Registrar.cc (source / functions) Hit Total Coverage
Test: casacpp_coverage.info Lines: 28 101 27.7 %
Date: 2024-11-06 17:42:47 Functions: 4 15 26.7 %

          Line data    Source code
       1             : //# Registrar.cc: maintain registry of services
       2             : //# Copyright (C) 2017
       3             : //# Associated Universities, Inc. Washington DC, USA.
       4             : //#
       5             : //# This library is free software; you can redistribute it and/or modify it
       6             : //# under the terms of the GNU Library General Public License as published by
       7             : //# the Free Software Foundation; either version 2 of the License, or (at your
       8             : //# option) any later version.
       9             : //#
      10             : //# This library is distributed in the hope that it will be useful, but WITHOUT
      11             : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      12             : //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
      13             : //# License for more details.
      14             : //#
      15             : //# You should have received a copy of the GNU Library General Public License
      16             : //# along with this library; if not, write to the Free Software Foundation,
      17             : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
      18             : //#
      19             : //# Correspondence concerning AIPS++ should be addressed as follows:
      20             : //#        Internet email: casa-feedback@nrao.edu.
      21             : //#        Postal address: AIPS++ Project Office
      22             : //#                        National Radio Astronomy Observatory
      23             : //#                        520 Edgemont Road
      24             : //#                        Charlottesville, VA 22903-2475 USA
      25             : //#
      26             : #include <time.h>
      27             : #include <stdlib.h>
      28             : #include <map>
      29             : #include <algorithm>
      30             : #include <casatools/Proc/Registrar.h>
      31             : #ifdef USE_GRPC
      32             : #include <grpc++/grpc++.h>
      33             : #include "registrar.grpc.pb.h"
      34             : #include "shutdown.grpc.pb.h"
      35             : using grpc::Server;
      36             : using grpc::ServerBuilder;
      37             : using grpc::ServerContext;
      38             : using grpc::Status;
      39             : #endif
      40             : 
      41             : using std::find_if;
      42             : 
      43             : namespace casatools {   /** namespace for CASAtools classes within "CASA code" **/
      44             : 
      45             : #ifdef USE_GRPC
      46             : 
      47             :     // generated stubs/base-classes are in casatools::rpc namespace...
      48             :     class grpcRegistrar final : public rpc::Registrar::Service {
      49             : 
      50           0 :         Status add( ServerContext* context, const rpc::ServiceId* req, rpc::ServiceId* reply) override {
      51           0 :             std::lock_guard<std::mutex> guard(registry_mutex);
      52           0 :             if ( registry == 0 || req->id( ).size( ) == 0 ||
      53           0 :                  req->types( ).size( ) == 0 || req->uri( ).size( ) == 0 )
      54           0 :                 return Status::CANCELLED;
      55             : //          ServiceId actual = registry->add(ServiceId(req->id( ),req->uri( ),std::list<std::string>(req->types( ).begin( ),req->types( ).end( ))));
      56           0 :             std::list<std::string> service_types(req->types( ).begin( ),req->types( ).end( ));
      57           0 :             ServiceId actual = registry->add(ServiceId(req->id( ),req->uri( ),service_types));
      58           0 :             reply->set_id(actual.id( ));
      59           0 :             reply->set_uri(actual.uri( ));
      60           0 :             std::for_each( service_types.begin( ), service_types.end( ),
      61           0 :                            [&] (const std::string &s) { reply->add_types(s); } );
      62             : // ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
      63             : // -bash-4.2$ GRPC_DEBUG=1 /opt/rh/rh-python36/root/usr/bin/python3
      64             : // Python 3.6.3 (default, Jan  9 2018, 10:19:07) 
      65             : // [GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
      66             : // Type "help", "copyright", "credits" or "license" for more information.
      67             : // >>> from casatools import ctsys
      68             : // registry available at 0.0.0.0:39189
      69             : // >>> <1>               image-view
      70             : // <1>            interactive-clean
      71             : // <2>            image-view
      72             : // <2>            interactive-clean
      73             : // <2>            ??
      74             : // <2>            image-view
      75             : // <2>            interactive-clean
      76             : // <2>            ??
      77             : // <2>            image-view
      78             : // <2>            interactive-clean
      79             : // <2>            ??
      80             : // <2>            image-view
      81             : // <2>            interactive-clean
      82             : // <2>            ??
      83             : // <2>            image-view
      84             : // <2>            interactive-clean
      85             : // <2>            ??
      86             : // <2>            image-view
      87             : // <2>            interactive-clean
      88             : // <2>            ??
      89             : // <2>            image-view
      90             : // <2>            interactive-clean
      91             : // <2>            ??
      92             : // -bash-4.2$
      93             : // ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
      94             : //            std::for_each( actual.types( ).begin( ), actual.types( ).end( ),
      95             : //                           [&] (const std::string &s) {
      96             : //                               fprintf( stderr, "<2>\t\t%s\n", s.c_str( ) );
      97             : //                               fflush(stderr);
      98             : //                               if ( ++count > 20 ) exit(1);
      99             : //                               reply->add_types(s);
     100             : //                           } );
     101           0 :             return Status::OK;
     102           0 :         }
     103             : 
     104           0 :         Status remove( ServerContext* context, const rpc::ServiceId* req, ::google::protobuf::BoolValue* reply) override {
     105           0 :             std::lock_guard<std::mutex> guard(registry_mutex);
     106           0 :             if ( registry == 0 || req->id( ).size( ) == 0 )
     107           0 :                 return Status::CANCELLED;
     108           0 :             reply->set_value(registry->remove(req->id( )));
     109           0 :             return Status::OK;
     110           0 :         }
     111             : 
     112           0 :         Status services( ServerContext* context, const ::google::protobuf::Empty* req, rpc::ServiceIds* reply) override {
     113           0 :             std::list<ServiceId> services;
     114             : 
     115             :             {
     116           0 :                 std::lock_guard<std::mutex> guard(registry_mutex);
     117           0 :                 if ( registry == 0 )
     118           0 :                     return Status::CANCELLED;
     119           0 :                 services = registry->services( );
     120           0 :             }
     121             : 
     122           0 :             for ( std::list<ServiceId>::const_iterator ci = services.begin( ); ci != services.end( ); ++ci ) {
     123           0 :                 rpc::ServiceId *serv = reply->add_service( );
     124           0 :                 serv->set_id(ci->id( ));
     125           0 :                 serv->set_uri(ci->uri( ));
     126           0 :                 std::for_each( ci->types( ).begin( ), ci->types( ).end( ), [&] (const std::string &s) { serv->add_types(s); } );
     127             :             }
     128             :             
     129           0 :             return Status::OK;
     130           0 :         }
     131             : 
     132             :         std::mutex registry_mutex;
     133             :         Registrar *registry;
     134             : 
     135             :     public:
     136           3 :         grpcRegistrar( Registrar *r ) : registry(r) { }
     137             : 
     138             :     };
     139             : 
     140             :     struct grpcState {
     141             :         std::unique_ptr<Server> server;
     142             :         std::unique_ptr<grpcRegistrar> reg;
     143           3 :         ~grpcState( ) {
     144           3 :             if (getenv("GRPC_DEBUG")) {
     145           0 :                 fprintf(stderr, "stopping registry\n");
     146           0 :                 fflush(stderr);
     147             :             }
     148           3 :             if ( server ) server->Shutdown( );
     149           3 :         }
     150             :     };
     151             : #endif
     152             : 
     153             : 
     154           3 :     Registrar::Registrar( ) {
     155           3 :         srand(time(0));
     156             : #ifdef USE_GRPC
     157           3 :         grpc_state = 0;
     158           3 :         grpcState *state = new grpcState;
     159           3 :         state->reg.reset(new grpcRegistrar(this));
     160           3 :         ServerBuilder builder;
     161             :         char address_buf[100];
     162           3 :         constexpr char address_template[] = "0.0.0.0:%d";
     163           3 :         snprintf(address_buf,sizeof(address_buf),address_template,0);
     164           3 :         std::string server_address(address_buf);
     165           3 :         int selected_port = 0;
     166             : 
     167             :         // Listen on the given address without any authentication mechanism.
     168           3 :         builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &selected_port);
     169             :         // Register "service" as the instance through which we'll communicate with
     170             :         // clients. In this case it corresponds to an *synchronous* service.
     171           3 :         builder.RegisterService(state->reg.get( ));
     172             :         // Finally assemble the server.
     173           3 :         state->server = builder.BuildAndStart( );
     174           3 :         if ( selected_port > 0 ) {
     175             :             // if an available port can be found, selected_port is set to a value greater than zero
     176           3 :             snprintf(address_buf,sizeof(address_buf),address_template,selected_port);
     177           3 :             uri_ = address_buf;
     178           3 :             if (getenv("GRPC_DEBUG")) std::cerr << "registry available at " << uri_ << std::endl;
     179           3 :             grpc_state = (void*) state;
     180           0 :         } else delete state;
     181             : 
     182             : #endif
     183           3 :     }
     184             : 
     185           3 :     Registrar::~Registrar( ) {
     186             : #ifdef USE_GRPC
     187           3 :         for ( auto si = service_list.cbegin( ); si != service_list.cend( ); ++si ) {
     188           0 :             auto tl = si->types( );
     189           0 :             for ( auto ti = tl.begin( ); ti != tl.end( ); ++ti ) {
     190           0 :                 if ( *ti == "shutdown" ) {
     191           0 :                     auto uri = si->uri( );
     192           0 :                     if (getenv("GRPC_DEBUG")) {
     193             :                         std::cerr << "    ...sending shutdown notification to " <<
     194           0 :                             si->id( ) <<
     195           0 :                             " (" << uri << ")" << std::endl;
     196           0 :                         fflush(stderr);
     197             :                     }
     198           0 :                     grpc::ClientContext context;
     199             :                     std::unique_ptr<casatools::rpc::Shutdown::Stub> proxy =
     200           0 :                         casatools::rpc::Shutdown::NewStub(grpc::CreateChannel(uri, grpc::InsecureChannelCredentials( )));
     201           0 :                     ::google::protobuf::Empty req;
     202           0 :                     ::google::protobuf::Empty resp;
     203           0 :                     proxy->now( &context, req, &resp );
     204           0 :                     break;
     205           0 :                 }
     206             :             }
     207           0 :         }
     208           3 :         if ( grpc_state ) delete (grpcState*) grpc_state;
     209             : #endif
     210           3 :     }
     211             :     
     212           0 :     bool Registrar::remove( std::string id ) {
     213           0 :         std::lock_guard<std::mutex> guard(service_list_mutex);
     214           0 :         auto search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &sid) { return sid == id; });
     215           0 :         if ( search == service_list.end( ) )
     216           0 :             return false;
     217           0 :         service_list.erase(search);
     218           0 :         return true;
     219           0 :     }
     220             : 
     221           0 :     ServiceId Registrar::add( const ServiceId &proposed ) {
     222           0 :         const int maxindex = 65535;
     223             :         char buf[8];
     224           0 :         sprintf( buf, ":%04x", rand( ) % maxindex + 1 );
     225           0 :         std::lock_guard<std::mutex> guard(service_list_mutex);
     226           0 :         auto search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &id) { return id == (proposed.id() + buf); });
     227           0 :         while ( search != service_list.end( ) ) {
     228           0 :             sprintf( buf, ":%04x", rand( ) % maxindex + 1 );
     229           0 :             search = find_if(service_list.begin(), service_list.end(), [=](const ServiceId &id) { return id == (proposed.id() + buf); });
     230             :         }
     231           0 :         ServiceId result(proposed.id() + buf, proposed.uri(), proposed.types() );
     232           0 :         service_list.push_back(result);
     233           0 :         return result;
     234           0 :     }
     235             : 
     236             : }

Generated by: LCOV version 1.16