source: binary-improvements2/MapRendering/Web/SSE/EventBase.cs@ 384

Last change on this file since 384 was 382, checked in by alloc, 2 years ago

Switched to use SpaceWizards.HttpListener

File size: 4.0 KB
RevLine 
[367]1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net.Sockets;
5using System.Text;
6using AllocsFixes.JSON;
[382]7using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;
[367]8
9namespace AllocsFixes.NetConnections.Servers.Web.SSE {
10 public abstract class EventBase {
11 private const int EncodingBufferSize = 1024 * 1024;
12
13 private readonly SseHandler Parent;
14 public readonly string Name;
15
16 private readonly byte[] encodingBuffer;
17 private readonly StringBuilder stringBuilder = new StringBuilder ();
18
19 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
20
21 private readonly global::BlockingQueue<(string _eventName, object _data)> sendQueue =
22 new global::BlockingQueue<(string _eventName, object _data)> ();
23
24 private int currentlyOpen;
25 private int totalOpened;
26 private int totalClosed;
27
28 protected EventBase (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
29 Name = _name ?? GetType ().Name;
30 Parent = _parent;
31 if (_reuseEncodingBuffer) {
32 encodingBuffer = new byte[EncodingBufferSize];
33 }
34 }
35
36 public virtual void AddListener (HttpListenerResponse _resp) {
37 totalOpened++;
38 currentlyOpen++;
39
40 openStreams.Add (_resp);
41 }
42
43 protected void SendData (string _eventName, object _data) {
44 sendQueue.Enqueue ((_eventName, _data));
45 Parent.SignalSendQueue ();
46 }
47
48
49 public void ProcessSendQueue () {
50 while (sendQueue.HasData ()) {
51 (string eventName, object data) = sendQueue.Dequeue ();
52
53 stringBuilder.Append ("event: ");
54 stringBuilder.AppendLine (eventName);
55 stringBuilder.Append ("data: ");
56
[372]57 switch (data) {
58 case string dataString:
59 stringBuilder.AppendLine (dataString);
60 break;
61 case JSONNode dataJson:
62 dataJson.ToString (stringBuilder);
63 stringBuilder.AppendLine ("");
64 break;
65 default:
66 logError ("Data is neither string nor JSON.", false);
67 continue;
[367]68 }
69
70 stringBuilder.AppendLine ("");
71 string output = stringBuilder.ToString ();
72 stringBuilder.Clear ();
73
74 byte[] buf;
75 int bytesToSend;
76 if (encodingBuffer != null) {
77 buf = encodingBuffer;
78 try {
79 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
80 } catch (ArgumentException e) {
[372]81 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
[367]82 Log.Exception (e);
83 return;
84 }
85 } else {
86 buf = Encoding.UTF8.GetBytes (output);
87 bytesToSend = buf.Length;
88 }
89
90 for (int i = openStreams.Count - 1; i >= 0; i--) {
91 HttpListenerResponse resp = openStreams [i];
92 try {
93 if (resp.OutputStream.CanWrite) {
94 resp.OutputStream.Write (buf, 0, bytesToSend);
95 resp.OutputStream.Flush ();
96 } else {
97 currentlyOpen--;
98 totalClosed++;
99
[372]100 logError ("Can not write to endpoint, closing", true);
[367]101 openStreams.RemoveAt (i);
102 resp.Close ();
103 }
104 } catch (IOException e) {
105 currentlyOpen--;
106 totalClosed++;
107
108 openStreams.RemoveAt (i);
109
110 if (e.InnerException is SocketException se) {
[372]111 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
112 logError ($"SocketError ({se.SocketErrorCode}) while trying to write", true);
[367]113 }
114 } else {
[372]115 logError ("IOException while trying to write:", true);
[367]116 Log.Exception (e);
117 }
118 } catch (Exception e) {
119 currentlyOpen--;
120 totalClosed++;
121
122 openStreams.RemoveAt (i);
[372]123 logError ("Exception while trying to write:", true);
[367]124 Log.Exception (e);
125 resp.Close ();
126 }
127 }
128 }
129 }
130
[372]131 protected void logError (string _message, bool _printConnections) {
132 Log.Error (_printConnections
133 ? $"SSE ({Name}): {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
134 : $"SSE ({Name}): {_message}");
135 }
136
[367]137 public virtual int DefaultPermissionLevel () {
138 return 0;
139 }
140 }
141}
Note: See TracBrowser for help on using the repository browser.