From 5e4da01c3ae3ae2e870faba9085d9d9213c01c29 Mon Sep 17 00:00:00 2001 From: r Date: Fri, 13 Dec 2019 18:08:26 +0000 Subject: Initial commit --- mastodon/streaming.go | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 mastodon/streaming.go (limited to 'mastodon/streaming.go') diff --git a/mastodon/streaming.go b/mastodon/streaming.go new file mode 100644 index 0000000..77ae284 --- /dev/null +++ b/mastodon/streaming.go @@ -0,0 +1,166 @@ +package mastodon + +import ( + "bufio" + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "path" + "strings" +) + +// UpdateEvent is struct for passing status event to app. +type UpdateEvent struct { + Status *Status `json:"status"` +} + +func (e *UpdateEvent) event() {} + +// NotificationEvent is struct for passing notification event to app. +type NotificationEvent struct { + Notification *Notification `json:"notification"` +} + +func (e *NotificationEvent) event() {} + +// DeleteEvent is struct for passing deletion event to app. +type DeleteEvent struct{ ID string } + +func (e *DeleteEvent) event() {} + +// ErrorEvent is struct for passing errors to app. +type ErrorEvent struct{ err error } + +func (e *ErrorEvent) event() {} +func (e *ErrorEvent) Error() string { return e.err.Error() } + +// Event is interface passing events to app. +type Event interface { + event() +} + +func handleReader(q chan Event, r io.Reader) error { + var name string + s := bufio.NewScanner(r) + for s.Scan() { + line := s.Text() + token := strings.SplitN(line, ":", 2) + if len(token) != 2 { + continue + } + switch strings.TrimSpace(token[0]) { + case "event": + name = strings.TrimSpace(token[1]) + case "data": + var err error + switch name { + case "update": + var status Status + err = json.Unmarshal([]byte(token[1]), &status) + if err == nil { + q <- &UpdateEvent{&status} + } + case "notification": + var notification Notification + err = json.Unmarshal([]byte(token[1]), ¬ification) + if err == nil { + q <- &NotificationEvent{¬ification} + } + case "delete": + q <- &DeleteEvent{ID: string(strings.TrimSpace(token[1]))} + } + if err != nil { + q <- &ErrorEvent{err} + } + } + } + return s.Err() +} + +func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) { + u, err := url.Parse(c.config.Server) + if err != nil { + return nil, err + } + u.Path = path.Join(u.Path, "/api/v1/streaming", p) + u.RawQuery = params.Encode() + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + req.Header.Set("Authorization", "Bearer "+c.config.AccessToken) + + q := make(chan Event) + go func() { + defer close(q) + for { + select { + case <-ctx.Done(): + return + default: + } + + c.doStreaming(req, q) + } + }() + return q, nil +} + +func (c *Client) doStreaming(req *http.Request, q chan Event) { + resp, err := c.Do(req) + if err != nil { + q <- &ErrorEvent{err} + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + q <- &ErrorEvent{parseAPIError("bad request", resp)} + return + } + + err = handleReader(q, resp.Body) + if err != nil { + q <- &ErrorEvent{err} + } +} + +// StreamingUser return channel to read events on home. +func (c *Client) StreamingUser(ctx context.Context) (chan Event, error) { + return c.streaming(ctx, "user", nil) +} + +// StreamingPublic return channel to read events on public. +func (c *Client) StreamingPublic(ctx context.Context, isLocal bool) (chan Event, error) { + p := "public" + if isLocal { + p = path.Join(p, "local") + } + + return c.streaming(ctx, p, nil) +} + +// StreamingHashtag return channel to read events on tagged timeline. +func (c *Client) StreamingHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error) { + params := url.Values{} + params.Set("tag", tag) + + p := "hashtag" + if isLocal { + p = path.Join(p, "local") + } + + return c.streaming(ctx, p, params) +} + +// StreamingList return channel to read events on a list. +func (c *Client) StreamingList(ctx context.Context, id string) (chan Event, error) { + params := url.Values{} + params.Set("list", string(id)) + + return c.streaming(ctx, "list", params) +} -- cgit v1.2.3