//------------------------------------------------------------------------------ // // Copyright (c) Microsoft Corporation. All rights reserved. // //------------------------------------------------------------------------------ namespace Microsoft.Samples.Kinect.Webserver { using System; using System.Diagnostics; using System.IO; using System.Net; using System.Net.WebSockets; using System.Runtime.Serialization; using System.Threading.Tasks; using System.Windows.Threading; using Microsoft.Samples.Kinect.Webserver.Sensor.Serialization; /// /// Web socket communication channel used for server to call remote procedures exposed /// by web client and synchronously wait for the call to return with results. /// /// /// /// All communications performed by this channel are sent/received as UTF8-encoded web /// socket text messages. /// /// /// We only support one single RPC call to be sent at a time, by any client. /// Calls initiated before a previous call completes will simply fail. /// /// public sealed class WebSocketRpcChannel : WebSocketChannelBase { /// /// Default number of bytes in buffer used to receive RPC responses from client. /// private const int DefaultReceiveBufferSize = 2048; /// /// Function name used to verify that client connection is still fully operational. /// private const string PingFunctionName = "ping"; /// /// Buffer used to receive RPC responses from client. /// private byte[] receiveBuffer; /// /// Sequence Id of last function call performed. /// /// /// This is incremented with each function call, to make sequence ids unique. /// private int sequenceId; /// /// True if we're currently waiting for an RPC call to come back with a response. /// private bool isInCall; /// /// Initializes a new instance of the class. /// /// /// Web socket context. /// /// /// Action to perform when web socket becomes closed. /// /// /// Number of bytes in buffer used to receive RPC responses from client. /// private WebSocketRpcChannel(WebSocketContext context, Action closedAction, int receiveBufferSize) : base(context, closedAction) { this.receiveBuffer = new byte[receiveBufferSize]; } /// /// Attempt to open a new RPC channel from the specified HTTP listener context. /// /// /// HTTP listener context. /// /// /// Action to perform when web socket is opened. /// Will never be called if web channel can't be opened. /// /// /// Action to perform when web socket is closed. /// Will never be called if web channel can't be opened. /// /// /// Number of bytes in buffer used to receive RPC responses from client. /// /// /// If does not represent a web socket request, or if /// web socket channel could not be established, an appropriate status code will be /// returned via 's Response property. /// public static async void TryOpenAsync( HttpListenerContext listenerContext, Action openedAction, Action closedAction, int receiveBufferSize) { var socketContext = await HandleWebSocketRequestAsync(listenerContext); if (socketContext != null) { var channel = new WebSocketRpcChannel( socketContext, closedChannel => closedAction(closedChannel as WebSocketRpcChannel), receiveBufferSize); openedAction(channel); } } /// /// Attempt to open a new RPC channel from the specified HTTP listener context. /// /// /// HTTP listener context. /// /// /// Action to perform when web socket is opened. /// Will never be called if web channel can't be opened. /// /// /// Action to perform when web socket is closed. /// Will never be called if web channel can't be opened. /// /// /// /// If does not represent a web socket request, or if /// web socket channel could not be established, an appropriate status code will be /// returned via 's Response property. /// /// /// Will use default receive buffer size. /// /// public static void TryOpenAsync( HttpListenerContext listenerContext, Action openedAction, Action closedAction) { TryOpenAsync(listenerContext, openedAction, closedAction, DefaultReceiveBufferSize); } /// /// Determine if this web socket channel is open for sending/receiving messages /// or if it has been closed /// /// /// True if this web socket channel is still open, false otherwise. /// /// /// This call is expected to perform more comprehensive connection state checks /// than IsOpen property, which might include sending remote messages, if the /// specific subclass warrants it, so callers /// should be careful not to call this method too often. /// public override bool CheckConnectionStatus() { if (!base.CheckConnectionStatus()) { return false; } if (this.isInCall) { return true; } var result = this.CallFunction(PingFunctionName); var isValidConnection = result.Success && result.Result; // If client did not respond to our ping, but socket is still open, close it // because we're dealing with a client that does not respect our expected // communication contract. if (this.IsOpen && !isValidConnection) { this.Dispose(); } return isValidConnection; } /// /// Perform synchronous RPC function call. /// /// /// Type of function call result. /// /// /// Name of remote function to invoke. /// /// /// Function arguments. /// /// /// Result of RPC call. /// public RpcResult CallFunction(string functionName, params object[] args) { if (this.isInCall) { return new RpcResult(false); } this.isInCall = true; try { var frame = new DispatcherFrame { Continue = true }; RpcResult result = null; try { // Push dispatcher frame with a single posted message to process before // breaking out of frame Dispatcher.CurrentDispatcher.BeginInvoke( (Action)(async () => { try { result = await this.SendReceiveAsync(functionName, args); } catch (Exception e) { Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e); result = new RpcResult(false); } frame.Continue = false; })); Dispatcher.PushFrame(frame); return result; } catch (AggregateException e) { Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e); return new RpcResult(false); } } finally { this.isInCall = false; } } /// /// Asynchronously send RPC function call request and process response, ensuring that /// response matches request. /// /// /// Type of function call result. /// /// /// Name of remote function to invoke. /// /// /// Function arguments. /// /// /// Result of RPC call, as an await-able task. /// private async Task> SendReceiveAsync(string functionName, object[] args) { var call = new FunctionCallRequest(functionName, args, ++this.sequenceId); using (var callStream = new MemoryStream()) { call.ToJson(callStream); var sendResult = await this.SendAsync(new ArraySegment(callStream.GetBuffer(), 0, (int)callStream.Length), WebSocketMessageType.Text); if (!sendResult) { return new RpcResult(false); } } var receiveResult = await this.ReceiveCompleteMessageAsync(); if (receiveResult == null) { return new RpcResult(false); } using (var responseStream = new MemoryStream(this.receiveBuffer, 0, receiveResult.Count)) { FunctionCallResponse callResponse; try { callResponse = responseStream.FromJson>(); } catch (SerializationException) { return new RpcResult(false); } if (callResponse.id != call.id) { // call and response sequence ids don't match, so call did not succeed return new RpcResult(false); } return new RpcResult(true, callResponse.result); } } /// /// Asynchronously wait for RPC function call response, until a complete message has /// been received. /// /// /// WebSocketReceiveResult if complete, non-empty, text message has been received. /// Null if any failure occurred while receiving message. /// private async Task ReceiveCompleteMessageAsync() { int receiveCount = 0; WebSocketReceiveResult receiveResult; do { receiveResult = await this.ReceiveAsync(new ArraySegment(this.receiveBuffer, receiveCount, this.receiveBuffer.Length - receiveCount)); if ((receiveResult == null) || (receiveResult.MessageType != WebSocketMessageType.Text)) { return null; } receiveCount += receiveResult.Count; if (receiveResult.EndOfMessage) { break; } // This can only happen if we've filled the buffer and message is still not completely // received, so we double the buffer size. Debug.Assert(receiveCount == this.receiveBuffer.Length, "ReceiveAsync method should guarantee that incomplete messages are only returned when buffer is completely full"); var newBuffer = new byte[receiveCount * 2]; Array.Copy(this.receiveBuffer, newBuffer, receiveCount); this.receiveBuffer = newBuffer; } while (!receiveResult.EndOfMessage); if (receiveCount == 0) { return null; } return new WebSocketReceiveResult(receiveCount, WebSocketMessageType.Text, true); } } }