source: binary-improvements/MapRendering/Web/SSE/EventBase.cs@ 370

Last change on this file since 370 was 367, checked in by alloc, 3 years ago

Web:

  • Added SSE (ServerSentEvents) subsystem
  • Added log endpoint to SSE. Less heavy weight and more responsive way of watching the server log
  • Bunch of refactoring
File size: 4.0 KB
RevLine 
[367]1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Net;
5using System.Net.Sockets;
6using System.Text;
7using AllocsFixes.JSON;
8
9namespace AllocsFixes.NetConnections.Servers.Web.SSE {
10 public abstract class EventBase {
11 private const int EncodingBufferSize = 1024 * 1024;
12
13 private readonly SseHandler Parent;
14 public readonly string Name;
15
16 private readonly byte[] encodingBuffer;
17 private readonly StringBuilder stringBuilder = new StringBuilder ();
18
19 private readonly List<HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();
20
21 private readonly global::BlockingQueue<(string _eventName, object _data)> sendQueue =
22 new global::BlockingQueue<(string _eventName, object _data)> ();
23
24 private int currentlyOpen;
25 private int totalOpened;
26 private int totalClosed;
27
28 protected EventBase (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
29 Name = _name ?? GetType ().Name;
30 Parent = _parent;
31 if (_reuseEncodingBuffer) {
32 encodingBuffer = new byte[EncodingBufferSize];
33 }
34 }
35
36 public virtual void AddListener (HttpListenerResponse _resp) {
37 totalOpened++;
38 currentlyOpen++;
39
40 openStreams.Add (_resp);
41 }
42
43 protected void SendData (string _eventName, object _data) {
44 sendQueue.Enqueue ((_eventName, _data));
45 Parent.SignalSendQueue ();
46 }
47
48
49 public void ProcessSendQueue () {
50 while (sendQueue.HasData ()) {
51 (string eventName, object data) = sendQueue.Dequeue ();
52
53 stringBuilder.Append ("event: ");
54 stringBuilder.AppendLine (eventName);
55 stringBuilder.Append ("data: ");
56
57 if (data is string dataString) {
58 stringBuilder.AppendLine (dataString);
59 } else if (data is JSONNode dataJson) {
60 dataJson.ToString (stringBuilder);
61 stringBuilder.AppendLine ("");
62 } else {
63 Log.Error ($"SSE ({Name}): Data is neither string nor JSON.");
64 continue;
65 }
66
67 stringBuilder.AppendLine ("");
68 string output = stringBuilder.ToString ();
69 stringBuilder.Clear ();
70
71 byte[] buf;
72 int bytesToSend;
73 if (encodingBuffer != null) {
74 buf = encodingBuffer;
75 try {
76 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
77 } catch (ArgumentException e) {
78 Log.Error ($"SSE ({Name}): Exception while encoding data for output, most likely exceeding buffer size:");
79 Log.Exception (e);
80 return;
81 }
82 } else {
83 buf = Encoding.UTF8.GetBytes (output);
84 bytesToSend = buf.Length;
85 }
86
87 for (int i = openStreams.Count - 1; i >= 0; i--) {
88 HttpListenerResponse resp = openStreams [i];
89 try {
90 if (resp.OutputStream.CanWrite) {
91 resp.OutputStream.Write (buf, 0, bytesToSend);
92 resp.OutputStream.Flush ();
93 } else {
94 currentlyOpen--;
95 totalClosed++;
96
97 Log.Out (
98 $"SSE ({Name}): Can not write to endpoint, closing. (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}");
99 openStreams.RemoveAt (i);
100 resp.Close ();
101 }
102 } catch (IOException e) {
103 currentlyOpen--;
104 totalClosed++;
105
106 openStreams.RemoveAt (i);
107
108 if (e.InnerException is SocketException se) {
109 if (se.SocketErrorCode != SocketError.ConnectionAborted) {
110 Log.Error ($"SSE ({Name}): SocketError ({se.SocketErrorCode}) while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}");
111 }
112 } else {
113 Log.Error (
114 $"SSE ({Name}): IOException while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}");
115 Log.Exception (e);
116 }
117
118 resp.Close ();
119 } catch (Exception e) {
120 currentlyOpen--;
121 totalClosed++;
122
123 openStreams.RemoveAt (i);
124 Log.Error (
125 $"SSE ({Name}): Exception while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}");
126 Log.Exception (e);
127 resp.Close ();
128 }
129 }
130 }
131 }
132
133 public virtual int DefaultPermissionLevel () {
134 return 0;
135 }
136 }
137}
Note: See TracBrowser for help on using the repository browser.