source: binary-improvements/MapRendering/Web/SSE/EventBase.cs@ 408

Last change on this file since 408 was 372, checked in by alloc, 2 years ago

A few bits of code cleanup

File size: 3.9 KB
RevLine 
[367]1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net;
5using System.Net.Sockets;
6using System.Text;
7using AllocsFixes.JSON;
8
9namespace AllocsFixes.NetConnections.Servers.Web.SSE {
10 public abstract class EventBase {
11 private const int EncodingBufferSize = 1024 * 1024;
12
13 private readonly SseHandler Parent;
14 public readonly string Name;
15
16 private readonly byte[] encodingBuffer;
17 private readonly StringBuilder stringBuilder = new StringBuilder ();
18
19 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
20
21 private readonly global::BlockingQueue<(string _eventName, object _data)> sendQueue =
22 new global::BlockingQueue<(string _eventName, object _data)> ();
23
24 private int currentlyOpen;
25 private int totalOpened;
26 private int totalClosed;
27
28 protected EventBase (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
29 Name = _name ?? GetType ().Name;
30 Parent = _parent;
31 if (_reuseEncodingBuffer) {
32 encodingBuffer = new byte[EncodingBufferSize];
33 }
34 }
35
36 public virtual void AddListener (HttpListenerResponse _resp) {
37 totalOpened++;
38 currentlyOpen++;
39
40 openStreams.Add (_resp);
41 }
42
43 protected void SendData (string _eventName, object _data) {
44 sendQueue.Enqueue ((_eventName, _data));
45 Parent.SignalSendQueue ();
46 }
47
48
49 public void ProcessSendQueue () {
50 while (sendQueue.HasData ()) {
51 (string eventName, object data) = sendQueue.Dequeue ();
52
53 stringBuilder.Append ("event: ");
54 stringBuilder.AppendLine (eventName);
55 stringBuilder.Append ("data: ");
56
[372]57 switch (data) {
58 case string dataString:
59 stringBuilder.AppendLine (dataString);
60 break;
61 case JSONNode dataJson:
62 dataJson.ToString (stringBuilder);
63 stringBuilder.AppendLine ("");
64 break;
65 default:
66 logError ("Data is neither string nor JSON.", false);
67 continue;
[367]68 }
69
70 stringBuilder.AppendLine ("");
71 string output = stringBuilder.ToString ();
72 stringBuilder.Clear ();
73
74 byte[] buf;
75 int bytesToSend;
76 if (encodingBuffer != null) {
77 buf = encodingBuffer;
78 try {
79 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
80 } catch (ArgumentException e) {
[372]81 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
[367]82 Log.Exception (e);
83 return;
84 }
85 } else {
86 buf = Encoding.UTF8.GetBytes (output);
87 bytesToSend = buf.Length;
88 }
89
90 for (int i = openStreams.Count - 1; i >= 0; i--) {
91 HttpListenerResponse resp = openStreams [i];
92 try {
93 if (resp.OutputStream.CanWrite) {
94 resp.OutputStream.Write (buf, 0, bytesToSend);
95 resp.OutputStream.Flush ();
96 } else {
97 currentlyOpen--;
98 totalClosed++;
99
[372]100 logError ("Can not write to endpoint, closing", true);
[367]101 openStreams.RemoveAt (i);
102 resp.Close ();
103 }
104 } catch (IOException e) {
105 currentlyOpen--;
106 totalClosed++;
107
108 openStreams.RemoveAt (i);
109
110 if (e.InnerException is SocketException se) {
[372]111 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) {
112 logError ($"SocketError ({se.SocketErrorCode}) while trying to write", true);
[367]113 }
114 } else {
[372]115 logError ("IOException while trying to write:", true);
[367]116 Log.Exception (e);
117 }
118
119 resp.Close ();
120 } catch (Exception e) {
121 currentlyOpen--;
122 totalClosed++;
123
124 openStreams.RemoveAt (i);
[372]125 logError ("Exception while trying to write:", true);
[367]126 Log.Exception (e);
127 resp.Close ();
128 }
129 }
130 }
131 }
132
[372]133 protected void logError (string _message, bool _printConnections) {
134 Log.Error (_printConnections
135 ? $"SSE ({Name}): {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
136 : $"SSE ({Name}): {_message}");
137 }
138
[367]139 public virtual int DefaultPermissionLevel () {
140 return 0;
141 }
142 }
143}
Note: See TracBrowser for help on using the repository browser.