source: binary-improvements2/WebServer/src/SSE/AbsEvent.cs@ 430

Last change on this file since 430 was 409, checked in by alloc, 21 months ago

Added keep alive messages to SSE endpoints

File size: 4.2 KB
RevLine 
[391]1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net.Sockets;
5using System.Text;
6using Webserver.UrlHandlers;
7using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;
8
9namespace Webserver.SSE {
10 public abstract class AbsEvent {
[409]11 private const int encodingBufferSize = 1024 * 1024;
12 private const int keepAliveIntervalSeconds = 10;
13 private static readonly byte[] keepAliveData = Encoding.UTF8.GetBytes (": KeepAlive\n\n");
[391]14
[409]15 private readonly SseHandler parent;
[391]16 public readonly string Name;
17
18 private readonly byte[] encodingBuffer;
19 private readonly StringBuilder stringBuilder = new StringBuilder ();
[409]20 private DateTime lastMessageSent;
[391]21
22 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
23
[402]24 private readonly BlockingQueue<(string _eventName, string _data)> sendQueue =
25 new BlockingQueue<(string _eventName, string _data)> ();
[391]26
27 private int currentlyOpen;
28 private int totalOpened;
29 private int totalClosed;
30
31 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
32 Name = _name ?? GetType ().Name;
[409]33 parent = _parent;
[391]34 if (_reuseEncodingBuffer) {
[409]35 encodingBuffer = new byte[encodingBufferSize];
[391]36 }
37 }
38
39 public void AddListener (HttpListenerResponse _resp) {
40 totalOpened++;
41 currentlyOpen++;
42
43 openStreams.Add (_resp);
44 }
45
[402]46 protected void SendData (string _eventName, string _data) {
[391]47 sendQueue.Enqueue ((_eventName, _data));
[409]48 parent.SignalSendQueue ();
[391]49 }
50
51 public void ProcessSendQueue () {
[409]52 bool dataSent = false;
53
[391]54 while (sendQueue.HasData ()) {
[402]55 (string eventName, string data) = sendQueue.Dequeue ();
[391]56
57 stringBuilder.Append ("event: ");
58 stringBuilder.AppendLine (eventName);
59 stringBuilder.Append ("data: ");
[402]60 stringBuilder.AppendLine (data);
[391]61
62 stringBuilder.AppendLine ("");
[402]63
[391]64 string output = stringBuilder.ToString ();
65 stringBuilder.Clear ();
66
67 byte[] buf;
68 int bytesToSend;
69 if (encodingBuffer != null) {
70 buf = encodingBuffer;
71 try {
72 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
73 } catch (ArgumentException e) {
74 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
75 Log.Exception (e);
76 return;
77 }
78 } else {
79 buf = Encoding.UTF8.GetBytes (output);
80 bytesToSend = buf.Length;
81 }
82
[409]83 dataSent = true;
[391]84
[409]85 sendBufToListeners (buf, bytesToSend);
86 }
[391]87
[409]88 DateTime now = DateTime.Now;
89 if (dataSent) {
90 lastMessageSent = now;
91 } else if ((now - lastMessageSent).TotalSeconds >= keepAliveIntervalSeconds) {
92 sendBufToListeners (keepAliveData, keepAliveData.Length);
93 lastMessageSent = now;
94 }
95 }
[391]96
[409]97 private void sendBufToListeners (byte[] _bytes, int _bytesToSend) {
98 for (int i = openStreams.Count - 1; i >= 0; i--) {
99 HttpListenerResponse resp = openStreams [i];
100 try {
101 if (resp.OutputStream.CanWrite) {
102 resp.OutputStream.Write (_bytes, 0, _bytesToSend);
103 resp.OutputStream.Flush ();
104 } else {
[391]105 currentlyOpen--;
106 totalClosed++;
107
[409]108 logError ("Can not write to endpoint, closing", true);
[391]109 openStreams.RemoveAt (i);
110 resp.Close ();
111 }
[409]112 } catch (IOException e) {
113 currentlyOpen--;
114 totalClosed++;
115
116 openStreams.RemoveAt (i);
117
118 if (e.InnerException is SocketException se) {
119 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
120 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);
121 }
122 } else {
123 logError ("IOException while trying to write:", true);
124 Log.Exception (e);
125 }
126 } catch (Exception e) {
127 currentlyOpen--;
128 totalClosed++;
129
130 openStreams.RemoveAt (i);
131 logError ("Exception while trying to write:", true);
132 Log.Exception (e);
133 resp.Close ();
[391]134 }
135 }
136 }
137
138 protected void logError (string _message, bool _printConnections) {
139 Log.Error (_printConnections
[399]140 ? $"[Web] [SSE] '{Name}': {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
141 : $"[Web] [SSE] '{Name}': {_message}");
[391]142 }
143
[409]144 public virtual int DefaultPermissionLevel () => 0;
[391]145 }
146}
Note: See TracBrowser for help on using the repository browser.