Changeset 409 for binary-improvements2/WebServer/src/SSE
- Timestamp:
- Feb 22, 2023, 5:52:30 PM (23 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
binary-improvements2/WebServer/src/SSE/AbsEvent.cs
r402 r409 9 9 namespace Webserver.SSE { 10 10 public abstract class AbsEvent { 11 private const int EncodingBufferSize = 1024 * 1024; 11 private const int encodingBufferSize = 1024 * 1024; 12 private const int keepAliveIntervalSeconds = 10; 13 private static readonly byte[] keepAliveData = Encoding.UTF8.GetBytes (": KeepAlive\n\n"); 12 14 13 private readonly SseHandler Parent;15 private readonly SseHandler parent; 14 16 public readonly string Name; 15 17 16 18 private readonly byte[] encodingBuffer; 17 19 private readonly StringBuilder stringBuilder = new StringBuilder (); 20 private DateTime lastMessageSent; 18 21 19 22 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> (); … … 28 31 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) { 29 32 Name = _name ?? GetType ().Name; 30 Parent = _parent;33 parent = _parent; 31 34 if (_reuseEncodingBuffer) { 32 encodingBuffer = new byte[ EncodingBufferSize];35 encodingBuffer = new byte[encodingBufferSize]; 33 36 } 34 37 } … … 43 46 protected void SendData (string _eventName, string _data) { 44 47 sendQueue.Enqueue ((_eventName, _data)); 45 Parent.SignalSendQueue ();48 parent.SignalSendQueue (); 46 49 } 47 50 48 51 public void ProcessSendQueue () { 52 bool dataSent = false; 53 49 54 while (sendQueue.HasData ()) { 50 55 (string eventName, string data) = sendQueue.Dequeue (); … … 76 81 } 77 82 78 for (int i = openStreams.Count - 1; i >= 0; i--) { 79 HttpListenerResponse resp = openStreams [i]; 80 try { 81 if (resp.OutputStream.CanWrite) { 82 resp.OutputStream.Write (buf, 0, bytesToSend); 83 resp.OutputStream.Flush (); 84 } else { 85 currentlyOpen--; 86 totalClosed++; 83 dataSent = true; 87 84 88 logError ("Can not write to endpoint, closing", true); 89 openStreams.RemoveAt (i); 90 resp.Close (); 91 } 92 } catch (IOException e) { 85 sendBufToListeners (buf, bytesToSend); 86 } 87 88 DateTime now = DateTime.Now; 89 if (dataSent) { 90 lastMessageSent = now; 91 } else if ((now - lastMessageSent).TotalSeconds >= keepAliveIntervalSeconds) { 92 sendBufToListeners (keepAliveData, keepAliveData.Length); 93 lastMessageSent = now; 94 } 95 } 96 97 private void sendBufToListeners (byte[] _bytes, int _bytesToSend) { 98 for (int i = openStreams.Count - 1; i >= 0; i--) { 99 HttpListenerResponse resp = openStreams [i]; 100 try { 101 if (resp.OutputStream.CanWrite) { 102 resp.OutputStream.Write (_bytes, 0, _bytesToSend); 103 resp.OutputStream.Flush (); 104 } else { 93 105 currentlyOpen--; 94 106 totalClosed++; 95 107 108 logError ("Can not write to endpoint, closing", true); 96 109 openStreams.RemoveAt (i); 97 98 if (e.InnerException is SocketException se) {99 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {100 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);101 }102 } else {103 logError ("IOException while trying to write:", true);104 Log.Exception (e);105 }106 } catch (Exception e) {107 currentlyOpen--;108 totalClosed++;109 110 openStreams.RemoveAt (i);111 logError ("Exception while trying to write:", true);112 Log.Exception (e);113 110 resp.Close (); 114 111 } 112 } catch (IOException e) { 113 currentlyOpen--; 114 totalClosed++; 115 116 openStreams.RemoveAt (i); 117 118 if (e.InnerException is SocketException se) { 119 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) { 120 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true); 121 } 122 } else { 123 logError ("IOException while trying to write:", true); 124 Log.Exception (e); 125 } 126 } catch (Exception e) { 127 currentlyOpen--; 128 totalClosed++; 129 130 openStreams.RemoveAt (i); 131 logError ("Exception while trying to write:", true); 132 Log.Exception (e); 133 resp.Close (); 115 134 } 116 135 } … … 123 142 } 124 143 125 public virtual int DefaultPermissionLevel () { 126 return 0; 127 } 144 public virtual int DefaultPermissionLevel () => 0; 128 145 } 129 146 }
Note:
See TracChangeset
for help on using the changeset viewer.