source: TFP-WebServer/WebServer/src/SSE/AbsEvent.cs@ 491

Last change on this file since 491 was 487, checked in by alloc, 5 months ago

1.1.0.1 Release for V 1.0

File size: 2.9 KB
RevLine 
[391]1using System;
2using System.Collections.Generic;
3using System.Text;
4using Webserver.UrlHandlers;
5
6namespace Webserver.SSE {
7 public abstract class AbsEvent {
[409]8 private const int encodingBufferSize = 1024 * 1024;
[391]9
[409]10 private readonly SseHandler parent;
[391]11 public readonly string Name;
12
13 private readonly byte[] encodingBuffer;
14 private readonly StringBuilder stringBuilder = new StringBuilder ();
15
[487]16 private readonly List<SseClient> openClients = new List<SseClient> ();
[391]17
[402]18 private readonly BlockingQueue<(string _eventName, string _data)> sendQueue =
19 new BlockingQueue<(string _eventName, string _data)> ();
[391]20
21 private int currentlyOpen;
22 private int totalOpened;
23 private int totalClosed;
24
25 protected AbsEvent (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) {
26 Name = _name ?? GetType ().Name;
[409]27 parent = _parent;
[391]28 if (_reuseEncodingBuffer) {
[409]29 encodingBuffer = new byte[encodingBufferSize];
[391]30 }
31 }
32
[487]33 public void AddListener (SseClient _client) {
[391]34 totalOpened++;
35 currentlyOpen++;
36
[487]37 openClients.Add (_client);
[391]38 }
39
[402]40 protected void SendData (string _eventName, string _data) {
[391]41 sendQueue.Enqueue ((_eventName, _data));
[409]42 parent.SignalSendQueue ();
[391]43 }
44
45 public void ProcessSendQueue () {
46 while (sendQueue.HasData ()) {
[402]47 (string eventName, string data) = sendQueue.Dequeue ();
[391]48
49 stringBuilder.Append ("event: ");
50 stringBuilder.AppendLine (eventName);
51 stringBuilder.Append ("data: ");
[402]52 stringBuilder.AppendLine (data);
[391]53
54 stringBuilder.AppendLine ("");
[402]55
[391]56 string output = stringBuilder.ToString ();
57 stringBuilder.Clear ();
58
59 byte[] buf;
60 int bytesToSend;
61 if (encodingBuffer != null) {
62 buf = encodingBuffer;
63 try {
64 bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0);
65 } catch (ArgumentException e) {
66 logError ("Exception while encoding data for output, most likely exceeding buffer size:", false);
67 Log.Exception (e);
68 return;
69 }
70 } else {
71 buf = Encoding.UTF8.GetBytes (output);
72 bytesToSend = buf.Length;
73 }
74
[409]75 sendBufToListeners (buf, bytesToSend);
76 }
77 }
[391]78
[409]79 private void sendBufToListeners (byte[] _bytes, int _bytesToSend) {
[487]80 for (int i = openClients.Count - 1; i >= 0; i--) {
81 ESseClientWriteResult writeResult = openClients [i].Write (_bytes, _bytesToSend);
82 if (writeResult == ESseClientWriteResult.Ok) {
83 continue;
84 }
[391]85
[487]86 currentlyOpen--;
87 totalClosed++;
[409]88
[487]89 if (writeResult == ESseClientWriteResult.Error) {
90 logError ("Can not write to endpoint, closing", true);
[391]91 }
92 }
93 }
94
95 protected void logError (string _message, bool _printConnections) {
96 Log.Error (_printConnections
[399]97 ? $"[Web] [SSE] '{Name}': {_message} (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed})"
98 : $"[Web] [SSE] '{Name}': {_message}");
[391]99 }
100
[409]101 public virtual int DefaultPermissionLevel () => 0;
[487]102
103 public void ClientClosed (SseClient _client) {
104 openClients.Remove (_client);
105 }
[391]106 }
107}
Note: See TracBrowser for help on using the repository browser.