//------------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
//------------------------------------------------------------------------------
namespace Microsoft.Samples.Kinect.Webserver
{
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.WebSockets;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using System.Windows.Threading;
using Microsoft.Samples.Kinect.Webserver.Sensor.Serialization;
///
/// Web socket communication channel used for server to call remote procedures exposed
/// by web client and synchronously wait for the call to return with results.
///
///
///
/// All communications performed by this channel are sent/received as UTF8-encoded web
/// socket text messages.
///
///
/// We only support one single RPC call to be sent at a time, by any client.
/// Calls initiated before a previous call completes will simply fail.
///
///
public sealed class WebSocketRpcChannel : WebSocketChannelBase
{
///
/// Default number of bytes in buffer used to receive RPC responses from client.
///
private const int DefaultReceiveBufferSize = 2048;
///
/// Function name used to verify that client connection is still fully operational.
///
private const string PingFunctionName = "ping";
///
/// Buffer used to receive RPC responses from client.
///
private byte[] receiveBuffer;
///
/// Sequence Id of last function call performed.
///
///
/// This is incremented with each function call, to make sequence ids unique.
///
private int sequenceId;
///
/// True if we're currently waiting for an RPC call to come back with a response.
///
private bool isInCall;
///
/// Initializes a new instance of the class.
///
///
/// Web socket context.
///
///
/// Action to perform when web socket becomes closed.
///
///
/// Number of bytes in buffer used to receive RPC responses from client.
///
private WebSocketRpcChannel(WebSocketContext context, Action closedAction, int receiveBufferSize)
: base(context, closedAction)
{
this.receiveBuffer = new byte[receiveBufferSize];
}
///
/// Attempt to open a new RPC channel from the specified HTTP listener context.
///
///
/// HTTP listener context.
///
///
/// Action to perform when web socket is opened.
/// Will never be called if web channel can't be opened.
///
///
/// Action to perform when web socket is closed.
/// Will never be called if web channel can't be opened.
///
///
/// Number of bytes in buffer used to receive RPC responses from client.
///
///
/// If does not represent a web socket request, or if
/// web socket channel could not be established, an appropriate status code will be
/// returned via 's Response property.
///
public static async void TryOpenAsync(
HttpListenerContext listenerContext,
Action openedAction,
Action closedAction,
int receiveBufferSize)
{
var socketContext = await HandleWebSocketRequestAsync(listenerContext);
if (socketContext != null)
{
var channel = new WebSocketRpcChannel(
socketContext, closedChannel => closedAction(closedChannel as WebSocketRpcChannel), receiveBufferSize);
openedAction(channel);
}
}
///
/// Attempt to open a new RPC channel from the specified HTTP listener context.
///
///
/// HTTP listener context.
///
///
/// Action to perform when web socket is opened.
/// Will never be called if web channel can't be opened.
///
///
/// Action to perform when web socket is closed.
/// Will never be called if web channel can't be opened.
///
///
///
/// If does not represent a web socket request, or if
/// web socket channel could not be established, an appropriate status code will be
/// returned via 's Response property.
///
///
/// Will use default receive buffer size.
///
///
public static void TryOpenAsync(
HttpListenerContext listenerContext,
Action openedAction,
Action closedAction)
{
TryOpenAsync(listenerContext, openedAction, closedAction, DefaultReceiveBufferSize);
}
///
/// Determine if this web socket channel is open for sending/receiving messages
/// or if it has been closed
///
///
/// True if this web socket channel is still open, false otherwise.
///
///
/// This call is expected to perform more comprehensive connection state checks
/// than IsOpen property, which might include sending remote messages, if the
/// specific subclass warrants it, so callers
/// should be careful not to call this method too often.
///
public override bool CheckConnectionStatus()
{
if (!base.CheckConnectionStatus())
{
return false;
}
if (this.isInCall)
{
return true;
}
var result = this.CallFunction(PingFunctionName);
var isValidConnection = result.Success && result.Result;
// If client did not respond to our ping, but socket is still open, close it
// because we're dealing with a client that does not respect our expected
// communication contract.
if (this.IsOpen && !isValidConnection)
{
this.Dispose();
}
return isValidConnection;
}
///
/// Perform synchronous RPC function call.
///
///
/// Type of function call result.
///
///
/// Name of remote function to invoke.
///
///
/// Function arguments.
///
///
/// Result of RPC call.
///
public RpcResult CallFunction(string functionName, params object[] args)
{
if (this.isInCall)
{
return new RpcResult(false);
}
this.isInCall = true;
try
{
var frame = new DispatcherFrame { Continue = true };
RpcResult result = null;
try
{
// Push dispatcher frame with a single posted message to process before
// breaking out of frame
Dispatcher.CurrentDispatcher.BeginInvoke(
(Action)(async () =>
{
try
{
result = await this.SendReceiveAsync(functionName, args);
}
catch (Exception e)
{
Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
result = new RpcResult(false);
}
frame.Continue = false;
}));
Dispatcher.PushFrame(frame);
return result;
}
catch (AggregateException e)
{
Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
return new RpcResult(false);
}
}
finally
{
this.isInCall = false;
}
}
///
/// Asynchronously send RPC function call request and process response, ensuring that
/// response matches request.
///
///
/// Type of function call result.
///
///
/// Name of remote function to invoke.
///
///
/// Function arguments.
///
///
/// Result of RPC call, as an await-able task.
///
private async Task> SendReceiveAsync(string functionName, object[] args)
{
var call = new FunctionCallRequest(functionName, args, ++this.sequenceId);
using (var callStream = new MemoryStream())
{
call.ToJson(callStream);
var sendResult =
await
this.SendAsync(new ArraySegment(callStream.GetBuffer(), 0, (int)callStream.Length), WebSocketMessageType.Text);
if (!sendResult)
{
return new RpcResult(false);
}
}
var receiveResult = await this.ReceiveCompleteMessageAsync();
if (receiveResult == null)
{
return new RpcResult(false);
}
using (var responseStream = new MemoryStream(this.receiveBuffer, 0, receiveResult.Count))
{
FunctionCallResponse callResponse;
try
{
callResponse = responseStream.FromJson>();
}
catch (SerializationException)
{
return new RpcResult(false);
}
if (callResponse.id != call.id)
{
// call and response sequence ids don't match, so call did not succeed
return new RpcResult(false);
}
return new RpcResult(true, callResponse.result);
}
}
///
/// Asynchronously wait for RPC function call response, until a complete message has
/// been received.
///
///
/// WebSocketReceiveResult if complete, non-empty, text message has been received.
/// Null if any failure occurred while receiving message.
///
private async Task ReceiveCompleteMessageAsync()
{
int receiveCount = 0;
WebSocketReceiveResult receiveResult;
do
{
receiveResult =
await
this.ReceiveAsync(new ArraySegment(this.receiveBuffer, receiveCount, this.receiveBuffer.Length - receiveCount));
if ((receiveResult == null) || (receiveResult.MessageType != WebSocketMessageType.Text))
{
return null;
}
receiveCount += receiveResult.Count;
if (receiveResult.EndOfMessage)
{
break;
}
// This can only happen if we've filled the buffer and message is still not completely
// received, so we double the buffer size.
Debug.Assert(receiveCount == this.receiveBuffer.Length, "ReceiveAsync method should guarantee that incomplete messages are only returned when buffer is completely full");
var newBuffer = new byte[receiveCount * 2];
Array.Copy(this.receiveBuffer, newBuffer, receiveCount);
this.receiveBuffer = newBuffer;
}
while (!receiveResult.EndOfMessage);
if (receiveCount == 0)
{
return null;
}
return new WebSocketReceiveResult(receiveCount, WebSocketMessageType.Text, true);
}
}
}