Ignore:
Timestamp:
Feb 22, 2023, 5:52:30 PM (21 months ago)
Author:
alloc
Message:

Added keep alive messages to SSE endpoints

File:
1 edited

Legend:

Unmodified
Added
Removed
  • binary-improvements2/WebServer/src/SSE/AbsEvent.cs

    r402 r409  
    99namespace Webserver.SSE {
    1010        public abstract class AbsEvent {
    11                 private const int EncodingBufferSize = 1024 * 1024;
     11                private const int encodingBufferSize = 1024 * 1024;
     12                private const int keepAliveIntervalSeconds = 10;
     13                private static readonly byte[] keepAliveData = Encoding.UTF8.GetBytes (": KeepAlive\n\n");
    1214
    13                 private readonly SseHandler Parent;
     15                private readonly SseHandler parent;
    1416                public readonly string Name;
    1517
    1618                private readonly byte[] encodingBuffer;
    1719                private readonly StringBuilder stringBuilder = new StringBuilder ();
     20                private DateTime lastMessageSent;
    1821
    1922                private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
     
    2831                protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
    2932                        Name = _name ?? GetType ().Name;
    30                         Parent = _parent;
     33                        parent = _parent;
    3134                        if (_reuseEncodingBuffer) {
    32                                 encodingBuffer = new byte[EncodingBufferSize];
     35                                encodingBuffer = new byte[encodingBufferSize];
    3336                        }
    3437                }
     
    4346                protected void SendData (string _eventName, string _data) {
    4447                        sendQueue.Enqueue ((_eventName, _data));
    45                         Parent.SignalSendQueue ();
     48                        parent.SignalSendQueue ();
    4649                }
    4750
    4851                public void ProcessSendQueue () {
     52                        bool dataSent = false;
     53                       
    4954                        while (sendQueue.HasData ()) {
    5055                                (string eventName, string data) = sendQueue.Dequeue ();
     
    7681                                }
    7782
    78                                 for (int i = openStreams.Count - 1; i >= 0; i--) {
    79                                         HttpListenerResponse resp = openStreams [i];
    80                                         try {
    81                                                 if (resp.OutputStream.CanWrite) {
    82                                                         resp.OutputStream.Write (buf, 0, bytesToSend);
    83                                                         resp.OutputStream.Flush ();
    84                                                 } else {
    85                                                         currentlyOpen--;
    86                                                         totalClosed++;
     83                                dataSent = true;
    8784
    88                                                         logError ("Can not write to endpoint, closing", true);
    89                                                         openStreams.RemoveAt (i);
    90                                                         resp.Close ();
    91                                                 }
    92                                         } catch (IOException e) {
     85                                sendBufToListeners (buf, bytesToSend);
     86                        }
     87
     88                        DateTime now = DateTime.Now;
     89                        if (dataSent) {
     90                                lastMessageSent = now;
     91                        } else if ((now - lastMessageSent).TotalSeconds >= keepAliveIntervalSeconds) {
     92                                sendBufToListeners (keepAliveData, keepAliveData.Length);
     93                                lastMessageSent = now;
     94                        }
     95                }
     96
     97                private void sendBufToListeners (byte[] _bytes, int _bytesToSend) {
     98                        for (int i = openStreams.Count - 1; i >= 0; i--) {
     99                                HttpListenerResponse resp = openStreams [i];
     100                                try {
     101                                        if (resp.OutputStream.CanWrite) {
     102                                                resp.OutputStream.Write (_bytes, 0, _bytesToSend);
     103                                                resp.OutputStream.Flush ();
     104                                        } else {
    93105                                                currentlyOpen--;
    94106                                                totalClosed++;
    95107
     108                                                logError ("Can not write to endpoint, closing", true);
    96109                                                openStreams.RemoveAt (i);
    97 
    98                                                 if (e.InnerException is SocketException se) {
    99                                                         if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
    100                                                                 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);
    101                                                         }
    102                                                 } else {
    103                                                         logError ("IOException while trying to write:", true);
    104                                                         Log.Exception (e);
    105                                                 }
    106                                         } catch (Exception e) {
    107                                                 currentlyOpen--;
    108                                                 totalClosed++;
    109 
    110                                                 openStreams.RemoveAt (i);
    111                                                 logError ("Exception while trying to write:", true);
    112                                                 Log.Exception (e);
    113110                                                resp.Close ();
    114111                                        }
     112                                } catch (IOException e) {
     113                                        currentlyOpen--;
     114                                        totalClosed++;
     115
     116                                        openStreams.RemoveAt (i);
     117
     118                                        if (e.InnerException is SocketException se) {
     119                                                if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
     120                                                        logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);
     121                                                }
     122                                        } else {
     123                                                logError ("IOException while trying to write:", true);
     124                                                Log.Exception (e);
     125                                        }
     126                                } catch (Exception e) {
     127                                        currentlyOpen--;
     128                                        totalClosed++;
     129
     130                                        openStreams.RemoveAt (i);
     131                                        logError ("Exception while trying to write:", true);
     132                                        Log.Exception (e);
     133                                        resp.Close ();
    115134                                }
    116135                        }
     
    123142                }
    124143
    125                 public virtual int DefaultPermissionLevel () {
    126                         return 0;
    127                 }
     144                public virtual int DefaultPermissionLevel () => 0;
    128145        }
    129146}
Note: See TracChangeset for help on using the changeset viewer.