217 lines
7.8 KiB
Go
217 lines
7.8 KiB
Go
// Copyright 2026 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
// Package milestone_events publishes milestone progress changes as
|
|
// Server-Sent Events so other browser tabs viewing the same repository's
|
|
// milestone list (or a single milestone's issue list) can update their
|
|
// progress bars in near real time.
|
|
//
|
|
// Each public Publish* helper marshals a typed payload to JSON, wraps it
|
|
// in an *eventsource.Event whose Name is "repo-milestones.{repo_id}", and
|
|
// fans the event out to every currently connected user that has read
|
|
// access to the repository's issues unit. All publish helpers are
|
|
// non-blocking: they spawn a goroutine so request handlers do not stall
|
|
// on slow consumers.
|
|
package milestone_events
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
|
|
issues_model "code.gitea.io/gitea/models/issues"
|
|
access_model "code.gitea.io/gitea/models/perm/access"
|
|
repo_model "code.gitea.io/gitea/models/repo"
|
|
"code.gitea.io/gitea/models/unit"
|
|
user_model "code.gitea.io/gitea/models/user"
|
|
"code.gitea.io/gitea/modules/eventsource"
|
|
"code.gitea.io/gitea/modules/graceful"
|
|
"code.gitea.io/gitea/modules/json"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/sessiontag"
|
|
)
|
|
|
|
// Event payload structs ------------------------------------------------------
|
|
|
|
// MilestoneProgress is emitted whenever a milestone's issue counters
|
|
// (and therefore its completeness percentage) change. It funnels every
|
|
// mutation that can move the bar: issue close/reopen, milestone
|
|
// (re)assignment, issue creation/deletion, milestone status change and
|
|
// milestone edit.
|
|
type MilestoneProgress struct {
|
|
RepoID int64 `json:"repo_id"`
|
|
MilestoneID int64 `json:"milestone_id"`
|
|
OpenIssues int `json:"open_issues"`
|
|
ClosedIssues int `json:"closed_issues"`
|
|
Completeness int `json:"completeness"`
|
|
SessionTag string `json:"session_tag,omitempty"`
|
|
}
|
|
|
|
// MilestoneDeleted is emitted when a milestone is deleted so viewers can
|
|
// drop the card (or navigate away from a single-milestone view).
|
|
type MilestoneDeleted struct {
|
|
RepoID int64 `json:"repo_id"`
|
|
MilestoneID int64 `json:"milestone_id"`
|
|
}
|
|
|
|
// Broadcast plumbing ---------------------------------------------------------
|
|
|
|
// broadcastFn is the package-level seam used to send an event to a set of
|
|
// uids. Tests swap it out to capture calls without touching the real
|
|
// eventsource manager.
|
|
var broadcastFn = defaultBroadcast
|
|
|
|
func defaultBroadcast(uids []int64, event *eventsource.Event) {
|
|
mgr := eventsource.GetManager()
|
|
for _, uid := range uids {
|
|
mgr.SendMessage(uid, event)
|
|
}
|
|
}
|
|
|
|
// connectedUIDsLister returns the uid set the broadcast helpers should
|
|
// consider as candidate recipients. Tests override it to feed a
|
|
// deterministic list.
|
|
var connectedUIDsLister = func() []int64 {
|
|
return eventsource.GetManager().ConnectedUIDs()
|
|
}
|
|
|
|
// milestoneLookup re-reads a milestone by id from the detached context.
|
|
// Stubbable in tests so PublishMilestoneProgress can be exercised
|
|
// without a database.
|
|
var milestoneLookup = issues_model.GetMilestoneByID
|
|
|
|
// repoLookup loads a repository by id. Stubbable in tests so the
|
|
// access-filter logic can be exercised without spinning up a database.
|
|
var repoLookup = repo_model.GetRepositoryByID
|
|
|
|
// repoAccessChecker decides whether the user identified by uid is allowed
|
|
// to read the given repository's issues. Tests stub this to bypass the
|
|
// real permission system.
|
|
var repoAccessChecker = canReadMilestones
|
|
|
|
// connectedUIDsWithRepoIssueAccess returns the subset of currently
|
|
// connected uids that the access checker confirms can read the issues
|
|
// unit of repoID.
|
|
func connectedUIDsWithRepoIssueAccess(ctx context.Context, repoID int64) []int64 {
|
|
uids := connectedUIDsLister()
|
|
if len(uids) == 0 {
|
|
return nil
|
|
}
|
|
repo, err := repoLookup(ctx, repoID)
|
|
if err != nil {
|
|
log.Debug("milestone_events: GetRepositoryByID(%d) failed: %v", repoID, err)
|
|
return nil
|
|
}
|
|
allowed := make([]int64, 0, len(uids))
|
|
for _, uid := range uids {
|
|
ok, err := repoAccessChecker(ctx, uid, repo)
|
|
if err != nil {
|
|
log.Debug("milestone_events: access check uid=%d repo=%d: %v", uid, repoID, err)
|
|
continue
|
|
}
|
|
if ok {
|
|
allowed = append(allowed, uid)
|
|
}
|
|
}
|
|
return allowed
|
|
}
|
|
|
|
// canReadMilestones implements the real read-permission check used in
|
|
// production: a user may see milestone progress for a repo when they can
|
|
// read its issues unit.
|
|
func canReadMilestones(ctx context.Context, uid int64, repo *repo_model.Repository) (bool, error) {
|
|
user, err := user_model.GetUserByID(ctx, uid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
// AccessModeRead == 1; the literal mirrors project_events, where the
|
|
// perm_model typed constant would force another import alias and the
|
|
// meaning is well established here.
|
|
return access_model.HasAccessUnit(ctx, user, repo, unit.TypeIssues, 1)
|
|
}
|
|
|
|
// publishEvent is the shared pipeline used by every Publish* helper.
|
|
// It marshals the payload, builds the SSE Event, looks up authorized
|
|
// recipients, and fans the event out via broadcastFn.
|
|
func publishEvent(ctx context.Context, repoID int64, payload any) {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
log.Error("milestone_events: marshal payload for repo %d: %v", repoID, err)
|
|
return
|
|
}
|
|
event := &eventsource.Event{
|
|
Name: eventName(repoID),
|
|
Data: data,
|
|
}
|
|
uids := connectedUIDsWithRepoIssueAccess(ctx, repoID)
|
|
if len(uids) == 0 {
|
|
return
|
|
}
|
|
broadcastFn(uids, event)
|
|
}
|
|
|
|
// eventName returns the SSE event name for a given repo id.
|
|
func eventName(repoID int64) string {
|
|
return "repo-milestones." + strconv.FormatInt(repoID, 10)
|
|
}
|
|
|
|
// Publishers -----------------------------------------------------------------
|
|
|
|
// PublishMilestoneProgress re-reads the milestone's fresh counters and
|
|
// fans a MilestoneProgress event out to everyone who can read the repo's
|
|
// issues. The session tag is resolved synchronously from the request
|
|
// context before the goroutine starts; the goroutine itself runs on a
|
|
// detached, process-lifetime context so the request-scoped DB session
|
|
// being returned to the pool cannot make the re-fetch/access checks fail.
|
|
func PublishMilestoneProgress(ctx context.Context, milestoneID int64) {
|
|
if milestoneID <= 0 {
|
|
return
|
|
}
|
|
tag := sessiontag.SessionTagFromContext(ctx)
|
|
go func() {
|
|
detachCtx := detach(ctx)
|
|
m, err := milestoneLookup(detachCtx, milestoneID)
|
|
if err != nil {
|
|
log.Debug("milestone_events: GetMilestoneByID(%d) failed: %v", milestoneID, err)
|
|
return
|
|
}
|
|
payload := MilestoneProgress{
|
|
RepoID: m.RepoID,
|
|
MilestoneID: m.ID,
|
|
OpenIssues: m.NumOpenIssues,
|
|
ClosedIssues: m.NumClosedIssues,
|
|
Completeness: m.Completeness,
|
|
SessionTag: tag,
|
|
}
|
|
publishEvent(detachCtx, m.RepoID, payload)
|
|
}()
|
|
}
|
|
|
|
// PublishMilestoneDeleted fans a MilestoneDeleted event out for the given
|
|
// repo/milestone. No re-fetch is needed since the milestone is gone.
|
|
func PublishMilestoneDeleted(ctx context.Context, repoID, milestoneID int64) {
|
|
if repoID <= 0 || milestoneID <= 0 {
|
|
return
|
|
}
|
|
go func() {
|
|
detachCtx := detach(ctx)
|
|
publishEvent(detachCtx, repoID, MilestoneDeleted{
|
|
RepoID: repoID,
|
|
MilestoneID: milestoneID,
|
|
})
|
|
}()
|
|
}
|
|
|
|
// detach returns a context safe for use in the fire-and-forget publish
|
|
// goroutine. The request's context carries a request-scoped DB session
|
|
// that is returned to the pool once the HTTP handler completes; reusing
|
|
// it from the goroutine races with that teardown and makes subsequent
|
|
// queries (GetMilestoneByID, GetRepositoryByID, access checks) fail
|
|
// intermittently. The session tag is already resolved synchronously
|
|
// before the goroutine starts, so the goroutine needs no request-scoped
|
|
// values — only a clean, process-lifetime DB context. ShutdownContext is
|
|
// backed by the global engine, outlives any single request, and is
|
|
// cancelled on app shutdown so we don't leak goroutines past teardown.
|
|
func detach(_ context.Context) context.Context {
|
|
return graceful.GetManager().ShutdownContext()
|
|
}
|