LCOV - code coverage report
Current view: top level - usr/include/grpcpp/impl/codegen - server_interface.h (source / functions) Hit Total Coverage
Test: ctest_coverage.info Lines: 4 4 100.0 %
Date: 2023-11-06 10:06:49 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015 gRPC authors.
       4             :  *
       5             :  * Licensed under the Apache License, Version 2.0 (the "License");
       6             :  * you may not use this file except in compliance with the License.
       7             :  * You may obtain a copy of the License at
       8             :  *
       9             :  *     http://www.apache.org/licenses/LICENSE-2.0
      10             :  *
      11             :  * Unless required by applicable law or agreed to in writing, software
      12             :  * distributed under the License is distributed on an "AS IS" BASIS,
      13             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14             :  * See the License for the specific language governing permissions and
      15             :  * limitations under the License.
      16             :  *
      17             :  */
      18             : 
      19             : #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
      20             : #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
      21             : 
      22             : #include <grpc/impl/codegen/grpc_types.h>
      23             : #include <grpcpp/impl/codegen/byte_buffer.h>
      24             : #include <grpcpp/impl/codegen/call.h>
      25             : #include <grpcpp/impl/codegen/call_hook.h>
      26             : #include <grpcpp/impl/codegen/completion_queue_tag.h>
      27             : #include <grpcpp/impl/codegen/core_codegen_interface.h>
      28             : #include <grpcpp/impl/codegen/rpc_service_method.h>
      29             : #include <grpcpp/impl/codegen/server_context.h>
      30             : 
      31             : namespace grpc {
      32             : 
      33             : class AsyncGenericService;
      34             : class Channel;
      35             : class GenericServerContext;
      36             : class ServerCompletionQueue;
      37             : class ServerContext;
      38             : class ServerCredentials;
      39             : class Service;
      40             : 
      41             : extern CoreCodegenInterface* g_core_codegen_interface;
      42             : 
      43             : /// Models a gRPC server.
      44             : ///
      45             : /// Servers are configured and started via \a grpc::ServerBuilder.
      46             : namespace internal {
      47             : class ServerAsyncStreamingInterface;
      48             : }  // namespace internal
      49             : 
      50             : class ServerInterface : public internal::CallHook {
      51             :  public:
      52             :   virtual ~ServerInterface() {}
      53             : 
      54             :   /// \a Shutdown does the following things:
      55             :   ///
      56             :   /// 1. Shutdown the server: deactivate all listening ports, mark it in
      57             :   ///    "shutdown mode" so that further call Request's or incoming RPC matches
      58             :   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
      59             :   ///    calls as failed (!ok). This refers to calls that have been requested
      60             :   ///    at the server by the server-side library or application code but that
      61             :   ///    have not yet been matched to incoming RPCs from the client. Note that
      62             :   ///    this would even include default calls added automatically by the gRPC
      63             :   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
      64             :   ///
      65             :   /// 2. Block until all rpc method handlers invoked automatically by the sync
      66             :   ///    API finish.
      67             :   ///
      68             :   /// 3. If all pending calls complete (and all their operations are
      69             :   ///    retrieved by Next) before \a deadline expires, this finishes
      70             :   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
      71             :   ///    with the server after \a deadline expires. In the case of the sync API,
      72             :   ///    if the RPC function for a streaming call has already been started and
      73             :   ///    takes a week to complete, the RPC function won't be forcefully
      74             :   ///    terminated (since that would leave state corrupt and incomplete) and
      75             :   ///    the method handler will just keep running (which will prevent the
      76             :   ///    server from completing the "join" operation that it needs to do at
      77             :   ///    shutdown time).
      78             :   ///
      79             :   /// All completion queue associated with the server (for example, for async
      80             :   /// serving) must be shutdown *after* this method has returned:
      81             :   /// See \a ServerBuilder::AddCompletionQueue for details.
      82             :   /// They must also be drained (by repeated Next) after being shutdown.
      83             :   ///
      84             :   /// \param deadline How long to wait until pending rpcs are forcefully
      85             :   /// terminated.
      86             :   template <class T>
      87             :   void Shutdown(const T& deadline) {
      88             :     ShutdownInternal(TimePoint<T>(deadline).raw_time());
      89             :   }
      90             : 
      91             :   /// Shutdown the server without a deadline and forced cancellation.
      92             :   ///
      93             :   /// All completion queue associated with the server (for example, for async
      94             :   /// serving) must be shutdown *after* this method has returned:
      95             :   /// See \a ServerBuilder::AddCompletionQueue for details.
      96           1 :   void Shutdown() {
      97           1 :     ShutdownInternal(
      98           1 :         g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC));
      99           1 :   }
     100             : 
     101             :   /// Block waiting for all work to complete.
     102             :   ///
     103             :   /// \warning The server must be either shutting down or some other thread must
     104             :   /// call \a Shutdown for this function to ever return.
     105             :   virtual void Wait() = 0;
     106             : 
     107             :  protected:
     108             :   friend class ::grpc::Service;
     109             : 
     110             :   /// Register a service. This call does not take ownership of the service.
     111             :   /// The service must exist for the lifetime of the Server instance.
     112             :   virtual bool RegisterService(const grpc::string* host, Service* service) = 0;
     113             : 
     114             :   /// Register a generic service. This call does not take ownership of the
     115             :   /// service. The service must exist for the lifetime of the Server instance.
     116             :   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
     117             : 
     118             :   /// Tries to bind \a server to the given \a addr.
     119             :   ///
     120             :   /// It can be invoked multiple times.
     121             :   ///
     122             :   /// \param addr The address to try to bind to the server (eg, localhost:1234,
     123             :   /// 192.168.1.1:31416, [::1]:27182, etc.).
     124             :   /// \params creds The credentials associated with the server.
     125             :   ///
     126             :   /// \return bound port number on sucess, 0 on failure.
     127             :   ///
     128             :   /// \warning It's an error to call this method on an already started server.
     129             :   virtual int AddListeningPort(const grpc::string& addr,
     130             :                                ServerCredentials* creds) = 0;
     131             : 
     132             :   /// Start the server.
     133             :   ///
     134             :   /// \param cqs Completion queues for handling asynchronous services. The
     135             :   /// caller is required to keep all completion queues live until the server is
     136             :   /// destroyed.
     137             :   /// \param num_cqs How many completion queues does \a cqs hold.
     138             :   virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
     139             : 
     140             :   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
     141             : 
     142             :   virtual int max_receive_message_size() const = 0;
     143             : 
     144             :   virtual grpc_server* server() = 0;
     145             : 
     146             :   virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
     147             :                                 internal::Call* call) = 0;
     148             : 
     149             :   class BaseAsyncRequest : public internal::CompletionQueueTag {
     150             :    public:
     151             :     BaseAsyncRequest(ServerInterface* server, ServerContext* context,
     152             :                      internal::ServerAsyncStreamingInterface* stream,
     153             :                      CompletionQueue* call_cq,
     154             :                      ServerCompletionQueue* notification_cq, void* tag,
     155             :                      bool delete_on_finalize);
     156             :     virtual ~BaseAsyncRequest();
     157             : 
     158             :     bool FinalizeResult(void** tag, bool* status) override;
     159             : 
     160             :    private:
     161             :     void ContinueFinalizeResultAfterInterception();
     162             : 
     163             :    protected:
     164             :     ServerInterface* const server_;
     165             :     ServerContext* const context_;
     166             :     internal::ServerAsyncStreamingInterface* const stream_;
     167             :     CompletionQueue* const call_cq_;
     168             :     ServerCompletionQueue* const notification_cq_;
     169             :     void* const tag_;
     170             :     const bool delete_on_finalize_;
     171             :     grpc_call* call_;
     172             :     internal::Call call_wrapper_;
     173             :     internal::InterceptorBatchMethodsImpl interceptor_methods_;
     174             :     bool done_intercepting_;
     175             :   };
     176             : 
     177             :   /// RegisteredAsyncRequest is not part of the C++ API
     178             :   class RegisteredAsyncRequest : public BaseAsyncRequest {
     179             :    public:
     180             :     RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
     181             :                            internal::ServerAsyncStreamingInterface* stream,
     182             :                            CompletionQueue* call_cq,
     183             :                            ServerCompletionQueue* notification_cq, void* tag,
     184             :                            const char* name, internal::RpcMethod::RpcType type);
     185             : 
     186             :     virtual bool FinalizeResult(void** tag, bool* status) override {
     187             :       /* If we are done intercepting, then there is nothing more for us to do */
     188             :       if (done_intercepting_) {
     189             :         return BaseAsyncRequest::FinalizeResult(tag, status);
     190             :       }
     191             :       call_wrapper_ = internal::Call(
     192             :           call_, server_, call_cq_, server_->max_receive_message_size(),
     193             :           context_->set_server_rpc_info(name_, type_,
     194             :                                         *server_->interceptor_creators()));
     195             :       return BaseAsyncRequest::FinalizeResult(tag, status);
     196             :     }
     197             : 
     198             :    protected:
     199             :     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
     200             :                       ServerCompletionQueue* notification_cq);
     201             :     const char* name_;
     202             :     const internal::RpcMethod::RpcType type_;
     203             :   };
     204             : 
     205             :   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
     206             :    public:
     207             :     NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
     208             :                           ServerInterface* server, ServerContext* context,
     209             :                           internal::ServerAsyncStreamingInterface* stream,
     210             :                           CompletionQueue* call_cq,
     211             :                           ServerCompletionQueue* notification_cq, void* tag)
     212             :         : RegisteredAsyncRequest(
     213             :               server, context, stream, call_cq, notification_cq, tag,
     214             :               registered_method->name(), registered_method->method_type()) {
     215             :       IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
     216             :     }
     217             : 
     218             :     // uses RegisteredAsyncRequest::FinalizeResult
     219             :   };
     220             : 
     221             :   template <class Message>
     222             :   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
     223             :    public:
     224             :     PayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
     225             :                         ServerInterface* server, ServerContext* context,
     226             :                         internal::ServerAsyncStreamingInterface* stream,
     227             :                         CompletionQueue* call_cq,
     228             :                         ServerCompletionQueue* notification_cq, void* tag,
     229             :                         Message* request)
     230             :         : RegisteredAsyncRequest(
     231             :               server, context, stream, call_cq, notification_cq, tag,
     232             :               registered_method->name(), registered_method->method_type()),
     233             :           registered_method_(registered_method),
     234             :           server_(server),
     235             :           context_(context),
     236             :           stream_(stream),
     237             :           call_cq_(call_cq),
     238             :           notification_cq_(notification_cq),
     239             :           tag_(tag),
     240             :           request_(request) {
     241             :       IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
     242             :                    notification_cq);
     243             :     }
     244             : 
     245             :     ~PayloadAsyncRequest() {
     246             :       payload_.Release();  // We do not own the payload_
     247             :     }
     248             : 
     249             :     bool FinalizeResult(void** tag, bool* status) override {
     250             :       /* If we are done intercepting, then there is nothing more for us to do */
     251             :       if (done_intercepting_) {
     252             :         return RegisteredAsyncRequest::FinalizeResult(tag, status);
     253             :       }
     254             :       if (*status) {
     255             :         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
     256             :                                       payload_.bbuf_ptr(), request_)
     257             :                                       .ok()) {
     258             :           // If deserialization fails, we cancel the call and instantiate
     259             :           // a new instance of ourselves to request another call.  We then
     260             :           // return false, which prevents the call from being returned to
     261             :           // the application.
     262             :           g_core_codegen_interface->grpc_call_cancel_with_status(
     263             :               call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
     264             :           g_core_codegen_interface->grpc_call_unref(call_);
     265             :           new PayloadAsyncRequest(registered_method_, server_, context_,
     266             :                                   stream_, call_cq_, notification_cq_, tag_,
     267             :                                   request_);
     268             :           delete this;
     269             :           return false;
     270             :         }
     271             :       }
     272             :       /* Set interception point for recv message */
     273             :       interceptor_methods_.AddInterceptionHookPoint(
     274             :           experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
     275             :       interceptor_methods_.SetRecvMessage(request_);
     276             :       return RegisteredAsyncRequest::FinalizeResult(tag, status);
     277             :     }
     278             : 
     279             :    private:
     280             :     internal::RpcServiceMethod* const registered_method_;
     281             :     ServerInterface* const server_;
     282             :     ServerContext* const context_;
     283             :     internal::ServerAsyncStreamingInterface* const stream_;
     284             :     CompletionQueue* const call_cq_;
     285             : 
     286             :     ServerCompletionQueue* const notification_cq_;
     287             :     void* const tag_;
     288             :     Message* const request_;
     289             :     ByteBuffer payload_;
     290             :   };
     291             : 
     292             :   class GenericAsyncRequest : public BaseAsyncRequest {
     293             :    public:
     294             :     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
     295             :                         internal::ServerAsyncStreamingInterface* stream,
     296             :                         CompletionQueue* call_cq,
     297             :                         ServerCompletionQueue* notification_cq, void* tag,
     298             :                         bool delete_on_finalize);
     299             : 
     300             :     bool FinalizeResult(void** tag, bool* status) override;
     301             : 
     302             :    private:
     303             :     grpc_call_details call_details_;
     304             :   };
     305             : 
     306             :   template <class Message>
     307             :   void RequestAsyncCall(internal::RpcServiceMethod* method,
     308             :                         ServerContext* context,
     309             :                         internal::ServerAsyncStreamingInterface* stream,
     310             :                         CompletionQueue* call_cq,
     311             :                         ServerCompletionQueue* notification_cq, void* tag,
     312             :                         Message* message) {
     313             :     GPR_CODEGEN_ASSERT(method);
     314             :     new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
     315             :                                      notification_cq, tag, message);
     316             :   }
     317             : 
     318             :   void RequestAsyncCall(internal::RpcServiceMethod* method,
     319             :                         ServerContext* context,
     320             :                         internal::ServerAsyncStreamingInterface* stream,
     321             :                         CompletionQueue* call_cq,
     322             :                         ServerCompletionQueue* notification_cq, void* tag) {
     323             :     GPR_CODEGEN_ASSERT(method);
     324             :     new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
     325             :                               notification_cq, tag);
     326             :   }
     327             : 
     328             :   void RequestAsyncGenericCall(GenericServerContext* context,
     329             :                                internal::ServerAsyncStreamingInterface* stream,
     330             :                                CompletionQueue* call_cq,
     331             :                                ServerCompletionQueue* notification_cq,
     332             :                                void* tag) {
     333             :     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
     334             :                             tag, true);
     335             :   }
     336             : 
     337             :  private:
     338             :   // EXPERIMENTAL
     339             :   // Getter method for the vector of interceptor factory objects.
     340             :   // Returns a nullptr (rather than being pure) since this is a post-1.0 method
     341             :   // and adding a new pure method to an interface would be a breaking change
     342             :   // (even though this is private and non-API)
     343             :   virtual std::vector<
     344             :       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
     345             :   interceptor_creators() {
     346             :     return nullptr;
     347             :   }
     348             : 
     349             :   // EXPERIMENTAL
     350             :   // A method to get the callbackable completion queue associated with this
     351             :   // server. If the return value is nullptr, this server doesn't support
     352             :   // callback operations.
     353             :   // TODO(vjpai): Consider a better default like using a global CQ
     354             :   // Returns nullptr (rather than being pure) since this is a post-1.0 method
     355             :   // and adding a new pure method to an interface would be a breaking change
     356             :   // (even though this is private and non-API)
     357             :   virtual CompletionQueue* CallbackCQ() { return nullptr; }
     358             : };
     359             : 
     360             : }  // namespace grpc
     361             : 
     362             : #endif  // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H

Generated by: LCOV version 1.16