Line data Source code
1 : /*
2 : *
3 : * Copyright 2015 gRPC authors.
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : *
17 : */
18 :
19 : #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
20 : #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
21 :
22 : #include <grpc/impl/codegen/grpc_types.h>
23 : #include <grpcpp/impl/codegen/byte_buffer.h>
24 : #include <grpcpp/impl/codegen/call.h>
25 : #include <grpcpp/impl/codegen/call_hook.h>
26 : #include <grpcpp/impl/codegen/completion_queue_tag.h>
27 : #include <grpcpp/impl/codegen/core_codegen_interface.h>
28 : #include <grpcpp/impl/codegen/rpc_service_method.h>
29 : #include <grpcpp/impl/codegen/server_context.h>
30 :
31 : namespace grpc {
32 :
33 : class AsyncGenericService;
34 : class Channel;
35 : class GenericServerContext;
36 : class ServerCompletionQueue;
37 : class ServerContext;
38 : class ServerCredentials;
39 : class Service;
40 :
41 : extern CoreCodegenInterface* g_core_codegen_interface;
42 :
43 : /// Models a gRPC server.
44 : ///
45 : /// Servers are configured and started via \a grpc::ServerBuilder.
46 : namespace internal {
47 : class ServerAsyncStreamingInterface;
48 : } // namespace internal
49 :
50 : class ServerInterface : public internal::CallHook {
51 : public:
52 : virtual ~ServerInterface() {}
53 :
54 : /// \a Shutdown does the following things:
55 : ///
56 : /// 1. Shutdown the server: deactivate all listening ports, mark it in
57 : /// "shutdown mode" so that further call Request's or incoming RPC matches
58 : /// are no longer allowed. Also return all Request'ed-but-not-yet-active
59 : /// calls as failed (!ok). This refers to calls that have been requested
60 : /// at the server by the server-side library or application code but that
61 : /// have not yet been matched to incoming RPCs from the client. Note that
62 : /// this would even include default calls added automatically by the gRPC
63 : /// C++ API without the user's input (e.g., "Unimplemented RPC method")
64 : ///
65 : /// 2. Block until all rpc method handlers invoked automatically by the sync
66 : /// API finish.
67 : ///
68 : /// 3. If all pending calls complete (and all their operations are
69 : /// retrieved by Next) before \a deadline expires, this finishes
70 : /// gracefully. Otherwise, forcefully cancel all pending calls associated
71 : /// with the server after \a deadline expires. In the case of the sync API,
72 : /// if the RPC function for a streaming call has already been started and
73 : /// takes a week to complete, the RPC function won't be forcefully
74 : /// terminated (since that would leave state corrupt and incomplete) and
75 : /// the method handler will just keep running (which will prevent the
76 : /// server from completing the "join" operation that it needs to do at
77 : /// shutdown time).
78 : ///
79 : /// All completion queue associated with the server (for example, for async
80 : /// serving) must be shutdown *after* this method has returned:
81 : /// See \a ServerBuilder::AddCompletionQueue for details.
82 : /// They must also be drained (by repeated Next) after being shutdown.
83 : ///
84 : /// \param deadline How long to wait until pending rpcs are forcefully
85 : /// terminated.
86 : template <class T>
87 : void Shutdown(const T& deadline) {
88 : ShutdownInternal(TimePoint<T>(deadline).raw_time());
89 : }
90 :
91 : /// Shutdown the server without a deadline and forced cancellation.
92 : ///
93 : /// All completion queue associated with the server (for example, for async
94 : /// serving) must be shutdown *after* this method has returned:
95 : /// See \a ServerBuilder::AddCompletionQueue for details.
96 1 : void Shutdown() {
97 1 : ShutdownInternal(
98 1 : g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC));
99 1 : }
100 :
101 : /// Block waiting for all work to complete.
102 : ///
103 : /// \warning The server must be either shutting down or some other thread must
104 : /// call \a Shutdown for this function to ever return.
105 : virtual void Wait() = 0;
106 :
107 : protected:
108 : friend class ::grpc::Service;
109 :
110 : /// Register a service. This call does not take ownership of the service.
111 : /// The service must exist for the lifetime of the Server instance.
112 : virtual bool RegisterService(const grpc::string* host, Service* service) = 0;
113 :
114 : /// Register a generic service. This call does not take ownership of the
115 : /// service. The service must exist for the lifetime of the Server instance.
116 : virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
117 :
118 : /// Tries to bind \a server to the given \a addr.
119 : ///
120 : /// It can be invoked multiple times.
121 : ///
122 : /// \param addr The address to try to bind to the server (eg, localhost:1234,
123 : /// 192.168.1.1:31416, [::1]:27182, etc.).
124 : /// \params creds The credentials associated with the server.
125 : ///
126 : /// \return bound port number on sucess, 0 on failure.
127 : ///
128 : /// \warning It's an error to call this method on an already started server.
129 : virtual int AddListeningPort(const grpc::string& addr,
130 : ServerCredentials* creds) = 0;
131 :
132 : /// Start the server.
133 : ///
134 : /// \param cqs Completion queues for handling asynchronous services. The
135 : /// caller is required to keep all completion queues live until the server is
136 : /// destroyed.
137 : /// \param num_cqs How many completion queues does \a cqs hold.
138 : virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
139 :
140 : virtual void ShutdownInternal(gpr_timespec deadline) = 0;
141 :
142 : virtual int max_receive_message_size() const = 0;
143 :
144 : virtual grpc_server* server() = 0;
145 :
146 : virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
147 : internal::Call* call) = 0;
148 :
149 : class BaseAsyncRequest : public internal::CompletionQueueTag {
150 : public:
151 : BaseAsyncRequest(ServerInterface* server, ServerContext* context,
152 : internal::ServerAsyncStreamingInterface* stream,
153 : CompletionQueue* call_cq,
154 : ServerCompletionQueue* notification_cq, void* tag,
155 : bool delete_on_finalize);
156 : virtual ~BaseAsyncRequest();
157 :
158 : bool FinalizeResult(void** tag, bool* status) override;
159 :
160 : private:
161 : void ContinueFinalizeResultAfterInterception();
162 :
163 : protected:
164 : ServerInterface* const server_;
165 : ServerContext* const context_;
166 : internal::ServerAsyncStreamingInterface* const stream_;
167 : CompletionQueue* const call_cq_;
168 : ServerCompletionQueue* const notification_cq_;
169 : void* const tag_;
170 : const bool delete_on_finalize_;
171 : grpc_call* call_;
172 : internal::Call call_wrapper_;
173 : internal::InterceptorBatchMethodsImpl interceptor_methods_;
174 : bool done_intercepting_;
175 : };
176 :
177 : /// RegisteredAsyncRequest is not part of the C++ API
178 : class RegisteredAsyncRequest : public BaseAsyncRequest {
179 : public:
180 : RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
181 : internal::ServerAsyncStreamingInterface* stream,
182 : CompletionQueue* call_cq,
183 : ServerCompletionQueue* notification_cq, void* tag,
184 : const char* name, internal::RpcMethod::RpcType type);
185 :
186 : virtual bool FinalizeResult(void** tag, bool* status) override {
187 : /* If we are done intercepting, then there is nothing more for us to do */
188 : if (done_intercepting_) {
189 : return BaseAsyncRequest::FinalizeResult(tag, status);
190 : }
191 : call_wrapper_ = internal::Call(
192 : call_, server_, call_cq_, server_->max_receive_message_size(),
193 : context_->set_server_rpc_info(name_, type_,
194 : *server_->interceptor_creators()));
195 : return BaseAsyncRequest::FinalizeResult(tag, status);
196 : }
197 :
198 : protected:
199 : void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
200 : ServerCompletionQueue* notification_cq);
201 : const char* name_;
202 : const internal::RpcMethod::RpcType type_;
203 : };
204 :
205 : class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
206 : public:
207 : NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
208 : ServerInterface* server, ServerContext* context,
209 : internal::ServerAsyncStreamingInterface* stream,
210 : CompletionQueue* call_cq,
211 : ServerCompletionQueue* notification_cq, void* tag)
212 : : RegisteredAsyncRequest(
213 : server, context, stream, call_cq, notification_cq, tag,
214 : registered_method->name(), registered_method->method_type()) {
215 : IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
216 : }
217 :
218 : // uses RegisteredAsyncRequest::FinalizeResult
219 : };
220 :
221 : template <class Message>
222 : class PayloadAsyncRequest final : public RegisteredAsyncRequest {
223 : public:
224 : PayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
225 : ServerInterface* server, ServerContext* context,
226 : internal::ServerAsyncStreamingInterface* stream,
227 : CompletionQueue* call_cq,
228 : ServerCompletionQueue* notification_cq, void* tag,
229 : Message* request)
230 : : RegisteredAsyncRequest(
231 : server, context, stream, call_cq, notification_cq, tag,
232 : registered_method->name(), registered_method->method_type()),
233 : registered_method_(registered_method),
234 : server_(server),
235 : context_(context),
236 : stream_(stream),
237 : call_cq_(call_cq),
238 : notification_cq_(notification_cq),
239 : tag_(tag),
240 : request_(request) {
241 : IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
242 : notification_cq);
243 : }
244 :
245 : ~PayloadAsyncRequest() {
246 : payload_.Release(); // We do not own the payload_
247 : }
248 :
249 : bool FinalizeResult(void** tag, bool* status) override {
250 : /* If we are done intercepting, then there is nothing more for us to do */
251 : if (done_intercepting_) {
252 : return RegisteredAsyncRequest::FinalizeResult(tag, status);
253 : }
254 : if (*status) {
255 : if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
256 : payload_.bbuf_ptr(), request_)
257 : .ok()) {
258 : // If deserialization fails, we cancel the call and instantiate
259 : // a new instance of ourselves to request another call. We then
260 : // return false, which prevents the call from being returned to
261 : // the application.
262 : g_core_codegen_interface->grpc_call_cancel_with_status(
263 : call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
264 : g_core_codegen_interface->grpc_call_unref(call_);
265 : new PayloadAsyncRequest(registered_method_, server_, context_,
266 : stream_, call_cq_, notification_cq_, tag_,
267 : request_);
268 : delete this;
269 : return false;
270 : }
271 : }
272 : /* Set interception point for recv message */
273 : interceptor_methods_.AddInterceptionHookPoint(
274 : experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
275 : interceptor_methods_.SetRecvMessage(request_);
276 : return RegisteredAsyncRequest::FinalizeResult(tag, status);
277 : }
278 :
279 : private:
280 : internal::RpcServiceMethod* const registered_method_;
281 : ServerInterface* const server_;
282 : ServerContext* const context_;
283 : internal::ServerAsyncStreamingInterface* const stream_;
284 : CompletionQueue* const call_cq_;
285 :
286 : ServerCompletionQueue* const notification_cq_;
287 : void* const tag_;
288 : Message* const request_;
289 : ByteBuffer payload_;
290 : };
291 :
292 : class GenericAsyncRequest : public BaseAsyncRequest {
293 : public:
294 : GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
295 : internal::ServerAsyncStreamingInterface* stream,
296 : CompletionQueue* call_cq,
297 : ServerCompletionQueue* notification_cq, void* tag,
298 : bool delete_on_finalize);
299 :
300 : bool FinalizeResult(void** tag, bool* status) override;
301 :
302 : private:
303 : grpc_call_details call_details_;
304 : };
305 :
306 : template <class Message>
307 : void RequestAsyncCall(internal::RpcServiceMethod* method,
308 : ServerContext* context,
309 : internal::ServerAsyncStreamingInterface* stream,
310 : CompletionQueue* call_cq,
311 : ServerCompletionQueue* notification_cq, void* tag,
312 : Message* message) {
313 : GPR_CODEGEN_ASSERT(method);
314 : new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
315 : notification_cq, tag, message);
316 : }
317 :
318 : void RequestAsyncCall(internal::RpcServiceMethod* method,
319 : ServerContext* context,
320 : internal::ServerAsyncStreamingInterface* stream,
321 : CompletionQueue* call_cq,
322 : ServerCompletionQueue* notification_cq, void* tag) {
323 : GPR_CODEGEN_ASSERT(method);
324 : new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
325 : notification_cq, tag);
326 : }
327 :
328 : void RequestAsyncGenericCall(GenericServerContext* context,
329 : internal::ServerAsyncStreamingInterface* stream,
330 : CompletionQueue* call_cq,
331 : ServerCompletionQueue* notification_cq,
332 : void* tag) {
333 : new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
334 : tag, true);
335 : }
336 :
337 : private:
338 : // EXPERIMENTAL
339 : // Getter method for the vector of interceptor factory objects.
340 : // Returns a nullptr (rather than being pure) since this is a post-1.0 method
341 : // and adding a new pure method to an interface would be a breaking change
342 : // (even though this is private and non-API)
343 : virtual std::vector<
344 : std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
345 : interceptor_creators() {
346 : return nullptr;
347 : }
348 :
349 : // EXPERIMENTAL
350 : // A method to get the callbackable completion queue associated with this
351 : // server. If the return value is nullptr, this server doesn't support
352 : // callback operations.
353 : // TODO(vjpai): Consider a better default like using a global CQ
354 : // Returns nullptr (rather than being pure) since this is a post-1.0 method
355 : // and adding a new pure method to an interface would be a breaking change
356 : // (even though this is private and non-API)
357 : virtual CompletionQueue* CallbackCQ() { return nullptr; }
358 : };
359 :
360 : } // namespace grpc
361 :
362 : #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
|