mirror of
https://gitlab.com/ric_harvey/MailHog.git
synced 2024-11-25 07:14:04 +00:00
123 lines
2.4 KiB
Go
123 lines
2.4 KiB
Go
|
package goose
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrUnableToHijackRequest is returned by AddReceiver if the type
|
||
|
// conversion to http.Hijacker is unsuccessful
|
||
|
ErrUnableToHijackRequest = errors.New("Unable to hijack request")
|
||
|
)
|
||
|
|
||
|
// EventStream represents a collection of receivers
|
||
|
type EventStream struct {
|
||
|
mutex *sync.Mutex
|
||
|
receivers map[net.Conn]*EventReceiver
|
||
|
}
|
||
|
|
||
|
// NewEventStream creates a new event stream
|
||
|
func NewEventStream() *EventStream {
|
||
|
return &EventStream{
|
||
|
mutex: new(sync.Mutex),
|
||
|
receivers: make(map[net.Conn]*EventReceiver),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// EventReceiver represents a hijacked HTTP connection
|
||
|
type EventReceiver struct {
|
||
|
stream *EventStream
|
||
|
conn net.Conn
|
||
|
bufrw *bufio.ReadWriter
|
||
|
}
|
||
|
|
||
|
// Notify sends the event to all event stream receivers
|
||
|
func (es *EventStream) Notify(event string, bytes []byte) {
|
||
|
// TODO reader?
|
||
|
|
||
|
lines := strings.Split(string(bytes), "\n")
|
||
|
|
||
|
data := ""
|
||
|
for _, l := range lines {
|
||
|
data += event + ": " + l + "\n"
|
||
|
}
|
||
|
|
||
|
sz := len(data) + 1
|
||
|
size := fmt.Sprintf("%X", sz)
|
||
|
|
||
|
for _, er := range es.receivers {
|
||
|
go er.send(size, data)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (er *EventReceiver) send(size, data string) {
|
||
|
_, err := er.write([]byte(size + "\r\n"))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
lines := strings.Split(data, "\n")
|
||
|
for _, ln := range lines {
|
||
|
_, err = er.write([]byte(ln + "\n"))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
er.write([]byte("\r\n"))
|
||
|
}
|
||
|
|
||
|
func (er *EventReceiver) write(bytes []byte) (int, error) {
|
||
|
n, err := er.bufrw.Write(bytes)
|
||
|
|
||
|
if err != nil {
|
||
|
er.stream.mutex.Lock()
|
||
|
delete(er.stream.receivers, er.conn)
|
||
|
er.stream.mutex.Unlock()
|
||
|
er.conn.Close()
|
||
|
return n, err
|
||
|
}
|
||
|
|
||
|
err = er.bufrw.Flush()
|
||
|
if err != nil {
|
||
|
er.stream.mutex.Lock()
|
||
|
delete(er.stream.receivers, er.conn)
|
||
|
er.stream.mutex.Unlock()
|
||
|
er.conn.Close()
|
||
|
}
|
||
|
|
||
|
return n, err
|
||
|
}
|
||
|
|
||
|
// AddReceiver hijacks a http.ResponseWriter and attaches it to the event stream
|
||
|
func (es *EventStream) AddReceiver(w http.ResponseWriter) (*EventReceiver, error) {
|
||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||
|
w.Header().Set("Cache-Control", "no-cache")
|
||
|
w.Header().Set("Connection", "keep-alive")
|
||
|
|
||
|
w.WriteHeader(200)
|
||
|
|
||
|
hj, ok := w.(http.Hijacker)
|
||
|
if !ok {
|
||
|
return nil, ErrUnableToHijackRequest
|
||
|
}
|
||
|
|
||
|
hjConn, hjBufrw, err := hj.Hijack()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
rec := &EventReceiver{es, hjConn, hjBufrw}
|
||
|
|
||
|
es.mutex.Lock()
|
||
|
es.receivers[hjConn] = rec
|
||
|
es.mutex.Unlock()
|
||
|
|
||
|
return rec, nil
|
||
|
}
|