source: binary-improvements2/WebServer/src/SSE/AbsEvent.cs@ 402

Last change on this file since 402 was 402, checked in by alloc, 22 months ago
  • Major refactoring
  • Using Utf8Json for (de)serialization
  • Moving APIs to REST
  • Removing dependencies from WebServer and MapRenderer to ServerFixes
File size: 3.7 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net.Sockets;
5using System.Text;
6using Webserver.UrlHandlers;
7using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;
8
9namespace Webserver.SSE {
10 public abstract class AbsEvent {
11 private const int EncodingBufferSize = 1024 * 1024;
12
13 private readonly SseHandler Parent;
14 public readonly string Name;
15
16 private readonly byte[] encodingBuffer;
17 private readonly StringBuilder stringBuilder = new StringBuilder ();
18
19 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
20
21 private readonly BlockingQueue<(string _eventName, string _data)> sendQueue =
22 new BlockingQueue<(string _eventName, string _data)> ();
23
24 private int currentlyOpen;
25 private int totalOpened;
26 private int totalClosed;
27
28 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
29 Name = _name ?? GetType ().Name;
30 Parent = _parent;
31 if (_reuseEncodingBuffer) {
32 encodingBuffer = new byte[EncodingBufferSize];
33 }
34 }
35
36 public void AddListener (HttpListenerResponse _resp) {
37 totalOpened++;
38 currentlyOpen++;
39
40 openStreams.Add (_resp);
41 }
42
43 protected void SendData (string _eventName, string _data) {
44 sendQueue.Enqueue ((_eventName, _data));
45 Parent.SignalSendQueue ();
46 }
47
48 public void ProcessSendQueue () {
49 while (sendQueue.HasData ()) {
50 (string eventName, string data) = sendQueue.Dequeue ();
51
52 stringBuilder.Append ("event: ");
53 stringBuilder.AppendLine (eventName);
54 stringBuilder.Append ("data: ");
55 stringBuilder.AppendLine (data);
56
57 stringBuilder.AppendLine ("");
58
59 string output = stringBuilder.ToString ();
60 stringBuilder.Clear ();
61
62 byte[] buf;
63 int bytesToSend;
64 if (encodingBuffer != null) {
65 buf = encodingBuffer;
66 try {
67 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
68 } catch (ArgumentException e) {
69 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
70 Log.Exception (e);
71 return;
72 }
73 } else {
74 buf = Encoding.UTF8.GetBytes (output);
75 bytesToSend = buf.Length;
76 }
77
78 for (int i = openStreams.Count - 1; i >= 0; i--) {
79 HttpListenerResponse resp = openStreams [i];
80 try {
81 if (resp.OutputStream.CanWrite) {
82 resp.OutputStream.Write (buf, 0, bytesToSend);
83 resp.OutputStream.Flush ();
84 } else {
85 currentlyOpen--;
86 totalClosed++;
87
88 logError ("Can not write to endpoint, closing", true);
89 openStreams.RemoveAt (i);
90 resp.Close ();
91 }
92 } catch (IOException e) {
93 currentlyOpen--;
94 totalClosed++;
95
96 openStreams.RemoveAt (i);
97
98 if (e.InnerException is SocketException se) {
99 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
100 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true);
101 }
102 } else {
103 logError ("IOException while trying to write:", true);
104 Log.Exception (e);
105 }
106 } catch (Exception e) {
107 currentlyOpen--;
108 totalClosed++;
109
110 openStreams.RemoveAt (i);
111 logError ("Exception while trying to write:", true);
112 Log.Exception (e);
113 resp.Close ();
114 }
115 }
116 }
117 }
118
119 protected void logError (string _message, bool _printConnections) {
120 Log.Error (_printConnections
121 ? $"[Web] [SSE] '{Name}': {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
122 : $"[Web] [SSE] '{Name}': {_message}");
123 }
124
125 public virtual int DefaultPermissionLevel () {
126 return 0;
127 }
128 }
129}
Note: See TracBrowser for help on using the repository browser.