source: binary-improvements2/WebServer/src/SSE/AbsEvent.cs@ 392

Last change on this file since 392 was 391, checked in by alloc, 2 years ago

Major refactoring/cleanup

File size: 3.9 KB
RevLine 
[391]1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net.Sockets;
5using System.Text;
6using AllocsFixes.JSON;
7using Webserver.UrlHandlers;
8using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;
9
10namespace Webserver.SSE {
11 public abstract class AbsEvent {
12 private const int EncodingBufferSize = 1024 * 1024;
13
14 private readonly SseHandler Parent;
15 public readonly string Name;
16
17 private readonly byte[] encodingBuffer;
18 private readonly StringBuilder stringBuilder = new StringBuilder ();
19
20 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
21
22 private readonly BlockingQueue<(string _eventName, object _data)> sendQueue =
23 new BlockingQueue<(string _eventName, object _data)> ();
24
25 private int currentlyOpen;
26 private int totalOpened;
27 private int totalClosed;
28
29 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
30 Name = _name ?? GetType ().Name;
31 Parent = _parent;
32 if (_reuseEncodingBuffer) {
33 encodingBuffer = new byte[EncodingBufferSize];
34 }
35 }
36
37 public void AddListener (HttpListenerResponse _resp) {
38 totalOpened++;
39 currentlyOpen++;
40
41 openStreams.Add (_resp);
42 }
43
44 protected void SendData (string _eventName, object _data) {
45 sendQueue.Enqueue ((_eventName, _data));
46 Parent.SignalSendQueue ();
47 }
48
49
50 public void ProcessSendQueue () {
51 while (sendQueue.HasData ()) {
52 (string eventName, object data) = sendQueue.Dequeue ();
53
54 stringBuilder.Append ("event: ");
55 stringBuilder.AppendLine (eventName);
56 stringBuilder.Append ("data: ");
57
58 switch (data) {
59 case string dataString:
60 stringBuilder.AppendLine (dataString);
61 break;
62 case JsonNode dataJson:
63 dataJson.ToString (stringBuilder);
64 stringBuilder.AppendLine ("");
65 break;
66 default:
67 logError ("Data is neither string nor JSON.", false);
68 continue;
69 }
70
71 stringBuilder.AppendLine ("");
72 string output = stringBuilder.ToString ();
73 stringBuilder.Clear ();
74
75 byte[] buf;
76 int bytesToSend;
77 if (encodingBuffer != null) {
78 buf = encodingBuffer;
79 try {
80 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
81 } catch (ArgumentException e) {
82 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
83 Log.Exception (e);
84 return;
85 }
86 } else {
87 buf = Encoding.UTF8.GetBytes (output);
88 bytesToSend = buf.Length;
89 }
90
91 for (int i = openStreams.Count - 1; i >= 0; i--) {
92 HttpListenerResponse resp = openStreams [i];
93 try {
94 if (resp.OutputStream.CanWrite) {
95 resp.OutputStream.Write (buf, 0, bytesToSend);
96 resp.OutputStream.Flush ();
97 } else {
98 currentlyOpen--;
99 totalClosed++;
100
101 logError ("Can not write to endpoint, closing", true);
102 openStreams.RemoveAt (i);
103 resp.Close ();
104 }
105 } catch (IOException e) {
106 currentlyOpen--;
107 totalClosed++;
108
109 openStreams.RemoveAt (i);
110
111 if (e.InnerException is SocketException se) {
112 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
113 logError ($"SocketError ({se.SocketErrorCode}) while trying to write", true);
114 }
115 } else {
116 logError ("IOException while trying to write:", true);
117 Log.Exception (e);
118 }
119 } catch (Exception e) {
120 currentlyOpen--;
121 totalClosed++;
122
123 openStreams.RemoveAt (i);
124 logError ("Exception while trying to write:", true);
125 Log.Exception (e);
126 resp.Close ();
127 }
128 }
129 }
130 }
131
132 protected void logError (string _message, bool _printConnections) {
133 Log.Error (_printConnections
134 ? $"SSE ({Name}): {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
135 : $"SSE ({Name}): {_message}");
136 }
137
138 public virtual int DefaultPermissionLevel () {
139 return 0;
140 }
141 }
142}
Note: See TracBrowser for help on using the repository browser.