source: TFP-WebServer/WebServer/src/SSE/AbsEvent.cs@ 452

Last change on this file since 452 was 409, checked in by alloc, 21 months ago

Added keep alive messages to SSE endpoints

File size: 4.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net.Sockets;
5using System.Text;
6using Webserver.UrlHandlers;
7using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;
8
9namespace Webserver.SSE {
10 public abstract class AbsEvent {
11 private const int encodingBufferSize = 1024 * 1024;
12 private const int keepAliveIntervalSeconds = 10;
13 private static readonly byte[] keepAliveData = Encoding.UTF8.GetBytes (": KeepAlive\n\n");
14
15 private readonly SseHandler parent;
16 public readonly string Name;
17
18 private readonly byte[] encodingBuffer;
19 private readonly StringBuilder stringBuilder = new StringBuilder ();
20 private DateTime lastMessageSent;
21
22 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
23
24 private readonly BlockingQueue<(string _eventName, string _data)> sendQueue =
25 new BlockingQueue<(string _eventName, string _data)> ();
26
27 private int currentlyOpen;
28 private int totalOpened;
29 private int totalClosed;
30
31 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
32 Name = _name ?? GetType ().Name;
33 parent = _parent;
34 if (_reuseEncodingBuffer) {
35 encodingBuffer = new byte[encodingBufferSize];
36 }
37 }
38
39 public void AddListener (HttpListenerResponse _resp) {
40 totalOpened++;
41 currentlyOpen++;
42
43 openStreams.Add (_resp);
44 }
45
46 protected void SendData (string _eventName, string _data) {
47 sendQueue.Enqueue ((_eventName, _data));
48 parent.SignalSendQueue ();
49 }
50
51 public void ProcessSendQueue () {
52 bool dataSent = false;
53
54 while (sendQueue.HasData ()) {
55 (string eventName, string data) = sendQueue.Dequeue ();
56
57 stringBuilder.Append ("event: ");
58 stringBuilder.AppendLine (eventName);
59 stringBuilder.Append ("data: ");
60 stringBuilder.AppendLine (data);
61
62 stringBuilder.AppendLine ("");
63
64 string output = stringBuilder.ToString ();
65 stringBuilder.Clear ();
66
67 byte[] buf;
68 int bytesToSend;
69 if (encodingBuffer != null) {
70 buf = encodingBuffer;
71 try {
72 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
73 } catch (ArgumentException e) {
74 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
75 Log.Exception (e);
76 return;
77 }
78 } else {
79 buf = Encoding.UTF8.GetBytes (output);
80 bytesToSend = buf.Length;
81 }
82
83 dataSent = true;
84
85 sendBufToListeners (buf, bytesToSend);
86 }
87
88 DateTime now = DateTime.Now;
89 if (dataSent) {
90 lastMessageSent = now;
91 } else if ((now - lastMessageSent).TotalSeconds >= keepAliveIntervalSeconds) {
92 sendBufToListeners (keepAliveData, keepAliveData.Length);
93 lastMessageSent = now;
94 }
95 }
96
97 private void sendBufToListeners (byte[] _bytes, int _bytesToSend) {
98 for (int i = openStreams.Count - 1; i >= 0; i--) {
99 HttpListenerResponse resp = openStreams [i];
100 try {
101 if (resp.OutputStream.CanWrite) {
102 resp.OutputStream.Write (_bytes, 0, _bytesToSend);
103 resp.OutputStream.Flush ();
104 } else {
105 currentlyOpen--;
106 totalClosed++;
107
108 logError ("Can not write to endpoint, closing", true);
109 openStreams.RemoveAt (i);
110 resp.Close ();
111 }
112 } catch (IOException e) {
113 currentlyOpen--;
114 totalClosed++;
115
116 openStreams.RemoveAt (i);
117
118 if (e.InnerException is SocketException se) {
119 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
120 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);
121 }
122 } else {
123 logError ("IOException while trying to write:", true);
124 Log.Exception (e);
125 }
126 } catch (Exception e) {
127 currentlyOpen--;
128 totalClosed++;
129
130 openStreams.RemoveAt (i);
131 logError ("Exception while trying to write:", true);
132 Log.Exception (e);
133 resp.Close ();
134 }
135 }
136 }
137
138 protected void logError (string _message, bool _printConnections) {
139 Log.Error (_printConnections
140 ? $"[Web] [SSE] '{Name}': {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
141 : $"[Web] [SSE] '{Name}': {_message}");
142 }
143
144 public virtual int DefaultPermissionLevel () => 0;
145 }
146}
Note: See TracBrowser for help on using the repository browser.