9ee232b18d
The publish goroutine inherited the request context via
context.WithoutCancel. That context carries a request-scoped DB
session returned to the pool when the HTTP handler completes, so
GetProjectByID + access checks in the goroutine raced with session
teardown and intermittently returned empty recipient sets (uids=[]),
silently dropping every SSE board event. Root the detached context in
graceful.ShutdownContext() (global engine, process lifetime).
(cherry picked from commit bfc10289e6)
334 lines
12 KiB
Go
334 lines
12 KiB
Go
// Copyright 2026 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
// Package project_events publishes project board mutations as Server-Sent
|
|
// Events so other browser tabs viewing the same board can update their DOM
|
|
// in near real time.
|
|
//
|
|
// Each public Publish* helper marshals a typed payload to JSON, wraps it in
|
|
// an *eventsource.Event whose Name is "project-board.{project_id}", and
|
|
// fans the event out to every currently connected user that has read
|
|
// access to the project. All publish helpers are non-blocking: they spawn
|
|
// a goroutine so request handlers do not stall on slow consumers.
|
|
package project_events
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
|
|
access_model "code.gitea.io/gitea/models/perm/access"
|
|
project_model "code.gitea.io/gitea/models/project"
|
|
repo_model "code.gitea.io/gitea/models/repo"
|
|
"code.gitea.io/gitea/models/unit"
|
|
user_model "code.gitea.io/gitea/models/user"
|
|
"code.gitea.io/gitea/modules/eventsource"
|
|
"code.gitea.io/gitea/modules/graceful"
|
|
"code.gitea.io/gitea/modules/json"
|
|
"code.gitea.io/gitea/modules/log"
|
|
)
|
|
|
|
// sessionTagCtxKey is the context key under which the X-Session-Tag value
|
|
// from the originating HTTP request is stashed. Publishers read it via
|
|
// SessionTagFromContext to attach to outgoing events so the originating
|
|
// browser tab can suppress its own echo.
|
|
type sessionTagCtxKey struct{}
|
|
|
|
// WithSessionTag returns ctx decorated with the provided session tag.
|
|
// Web/API middleware reads the X-Session-Tag header and calls this so
|
|
// service- and model-layer publishers can pull the tag back out.
|
|
func WithSessionTag(ctx context.Context, tag string) context.Context {
|
|
if tag == "" {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, sessionTagCtxKey{}, tag)
|
|
}
|
|
|
|
// SessionTagFromContext returns the session tag previously stored via
|
|
// WithSessionTag, or "" when none was set.
|
|
func SessionTagFromContext(ctx context.Context) string {
|
|
if v, ok := ctx.Value(sessionTagCtxKey{}).(string); ok {
|
|
return v
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Event payload structs ------------------------------------------------------
|
|
|
|
// CardMoved is emitted when an issue is moved between columns or reordered
|
|
// within a column.
|
|
type CardMoved struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
IssueID int64 `json:"issue_id"`
|
|
FromColumnID int64 `json:"from_column_id"`
|
|
ToColumnID int64 `json:"to_column_id"`
|
|
Sorting int64 `json:"sorting"`
|
|
SessionTag string `json:"session_tag,omitempty"`
|
|
}
|
|
|
|
// CardLinked is emitted when an issue is added to a project's default column.
|
|
type CardLinked struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
IssueID int64 `json:"issue_id"`
|
|
ColumnID int64 `json:"column_id"`
|
|
SessionTag string `json:"session_tag,omitempty"`
|
|
}
|
|
|
|
// CardUnlinked is emitted when an issue is removed from a project.
|
|
type CardUnlinked struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
IssueID int64 `json:"issue_id"`
|
|
SessionTag string `json:"session_tag,omitempty"`
|
|
}
|
|
|
|
// ColumnCreated is emitted when a new column is added to a project.
|
|
type ColumnCreated struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
ColumnID int64 `json:"column_id"`
|
|
Title string `json:"title"`
|
|
Color string `json:"color"`
|
|
Sorting int64 `json:"sorting"`
|
|
IsDefault bool `json:"is_default"`
|
|
}
|
|
|
|
// ColumnUpdated is emitted when a column's title, color, or sorting changes.
|
|
type ColumnUpdated struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
ColumnID int64 `json:"column_id"`
|
|
Title string `json:"title"`
|
|
Color string `json:"color"`
|
|
Sorting int64 `json:"sorting"`
|
|
}
|
|
|
|
// ColumnDeleted is emitted when a column is removed from a project.
|
|
// Deletion implicitly relocates issues to the default column, so the
|
|
// publisher will also emit one CardMoved per affected issue; the frontend
|
|
// only needs to drop the column and react to the per-issue moves.
|
|
type ColumnDeleted struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
ColumnID int64 `json:"column_id"`
|
|
}
|
|
|
|
// ColumnSort is one entry in a ColumnReordered batch.
|
|
type ColumnSort struct {
|
|
ColumnID int64 `json:"column_id"`
|
|
Sorting int64 `json:"sorting"`
|
|
}
|
|
|
|
// ColumnReordered is emitted when columns within a project are dragged into
|
|
// a new order.
|
|
type ColumnReordered struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
Columns []ColumnSort `json:"columns"`
|
|
}
|
|
|
|
// ProjectUpdated is emitted when project metadata (title, description,
|
|
// card type, open/closed state) changes.
|
|
type ProjectUpdated struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
Title string `json:"title"`
|
|
Description string `json:"description"`
|
|
CardType string `json:"card_type"`
|
|
IsClosed bool `json:"is_closed"`
|
|
}
|
|
|
|
// ProjectDeleted is emitted when a project is deleted.
|
|
type ProjectDeleted struct {
|
|
ProjectID int64 `json:"project_id"`
|
|
}
|
|
|
|
// Broadcast plumbing ---------------------------------------------------------
|
|
|
|
// broadcastFn is the package-level seam used to send an event to a set of
|
|
// uids. Tests swap it out to capture calls without touching the real
|
|
// eventsource manager.
|
|
var broadcastFn = defaultBroadcast
|
|
|
|
func defaultBroadcast(uids []int64, event *eventsource.Event) {
|
|
mgr := eventsource.GetManager()
|
|
for _, uid := range uids {
|
|
mgr.SendMessage(uid, event)
|
|
}
|
|
}
|
|
|
|
// connectedUIDsLister returns the uid set the broadcast helpers should
|
|
// consider as candidate recipients. Tests override it to feed a
|
|
// deterministic list.
|
|
var connectedUIDsLister = func() []int64 {
|
|
return eventsource.GetManager().ConnectedUIDs()
|
|
}
|
|
|
|
// projectLookup loads a project by id. Stubbable in tests so the
|
|
// access-filter logic can be exercised without spinning up a database.
|
|
var projectLookup = project_model.GetProjectByID
|
|
|
|
// projectAccessChecker decides whether the user identified by uid is
|
|
// allowed to read the given project. Tests stub this to bypass the real
|
|
// permission system.
|
|
var projectAccessChecker = canReadProject
|
|
|
|
// connectedUIDsWithProjectAccess returns the subset of currently connected
|
|
// uids that the access checker confirms can read projectID.
|
|
func connectedUIDsWithProjectAccess(ctx context.Context, projectID int64) []int64 {
|
|
uids := connectedUIDsLister()
|
|
if len(uids) == 0 {
|
|
return nil
|
|
}
|
|
project, err := projectLookup(ctx, projectID)
|
|
if err != nil {
|
|
log.Debug("project_events: GetProjectByID(%d) failed: %v", projectID, err)
|
|
return nil
|
|
}
|
|
allowed := make([]int64, 0, len(uids))
|
|
for _, uid := range uids {
|
|
ok, err := projectAccessChecker(ctx, uid, project)
|
|
if err != nil {
|
|
log.Debug("project_events: access check uid=%d project=%d: %v", uid, projectID, err)
|
|
continue
|
|
}
|
|
if ok {
|
|
allowed = append(allowed, uid)
|
|
}
|
|
}
|
|
return allowed
|
|
}
|
|
|
|
// canReadProject implements the real read-permission check used in
|
|
// production: repo projects defer to the repo's TypeProjects unit access;
|
|
// user / org projects fall back to user visibility.
|
|
func canReadProject(ctx context.Context, uid int64, project *project_model.Project) (bool, error) {
|
|
user, err := user_model.GetUserByID(ctx, uid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if project.RepoID > 0 {
|
|
var repo *repo_model.Repository
|
|
if project.Repo != nil {
|
|
repo = project.Repo
|
|
} else {
|
|
repo, err = repo_model.GetRepositoryByID(ctx, project.RepoID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
// AccessModeRead == 1; we use the literal because the
|
|
// perm_model package's typed constant would force another
|
|
// import alias and the meaning is well established here.
|
|
ok, err := access_model.HasAccessUnit(ctx, user, repo, unit.TypeProjects, 1)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return ok, nil
|
|
}
|
|
|
|
if project.OwnerID > 0 {
|
|
owner := project.Owner
|
|
if owner == nil {
|
|
owner, err = user_model.GetUserByID(ctx, project.OwnerID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return user_model.IsUserVisibleToViewer(ctx, owner, user), nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// publishEvent is the shared pipeline used by every Publish* helper.
|
|
// It marshals the payload, builds the SSE Event, looks up authorized
|
|
// recipients, and fans the event out via broadcastFn. The whole thing
|
|
// runs inside the calling goroutine; callers should wrap it in `go` so
|
|
// request handlers stay responsive.
|
|
func publishEvent(ctx context.Context, projectID int64, payload any) {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
log.Error("project_events: marshal payload for project %d: %v", projectID, err)
|
|
return
|
|
}
|
|
event := &eventsource.Event{
|
|
Name: eventName(projectID),
|
|
Data: data,
|
|
}
|
|
uids := connectedUIDsWithProjectAccess(ctx, projectID)
|
|
if len(uids) == 0 {
|
|
return
|
|
}
|
|
broadcastFn(uids, event)
|
|
}
|
|
|
|
// eventName returns the SSE event name for a given project id.
|
|
func eventName(projectID int64) string {
|
|
return "project-board." + strconv.FormatInt(projectID, 10)
|
|
}
|
|
|
|
// Publishers -----------------------------------------------------------------
|
|
|
|
// PublishCardMoved fans out a CardMoved event for the given payload.
|
|
func PublishCardMoved(ctx context.Context, payload CardMoved) {
|
|
if payload.SessionTag == "" {
|
|
payload.SessionTag = SessionTagFromContext(ctx)
|
|
}
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishCardLinked fans out a CardLinked event for the given payload.
|
|
func PublishCardLinked(ctx context.Context, payload CardLinked) {
|
|
if payload.SessionTag == "" {
|
|
payload.SessionTag = SessionTagFromContext(ctx)
|
|
}
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishCardUnlinked fans out a CardUnlinked event for the given payload.
|
|
func PublishCardUnlinked(ctx context.Context, payload CardUnlinked) {
|
|
if payload.SessionTag == "" {
|
|
payload.SessionTag = SessionTagFromContext(ctx)
|
|
}
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishColumnCreated fans out a ColumnCreated event for the given payload.
|
|
func PublishColumnCreated(ctx context.Context, payload ColumnCreated) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishColumnUpdated fans out a ColumnUpdated event for the given payload.
|
|
func PublishColumnUpdated(ctx context.Context, payload ColumnUpdated) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishColumnDeleted fans out a ColumnDeleted event for the given payload.
|
|
func PublishColumnDeleted(ctx context.Context, payload ColumnDeleted) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishColumnReordered fans out a ColumnReordered event for the given payload.
|
|
func PublishColumnReordered(ctx context.Context, payload ColumnReordered) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishProjectUpdated fans out a ProjectUpdated event for the given payload.
|
|
func PublishProjectUpdated(ctx context.Context, payload ProjectUpdated) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// PublishProjectDeleted fans out a ProjectDeleted event for the given payload.
|
|
func PublishProjectDeleted(ctx context.Context, payload ProjectDeleted) {
|
|
go publishEvent(detach(ctx), payload.ProjectID, payload)
|
|
}
|
|
|
|
// detach returns a context safe for use in the fire-and-forget publish
|
|
// goroutine. The request's context carries a request-scoped DB session
|
|
// that is returned to the pool once the HTTP handler completes; reusing
|
|
// it from the goroutine races with that teardown and makes subsequent
|
|
// queries (GetProjectByID, access checks) fail intermittently. The
|
|
// session tag is already resolved synchronously before the goroutine
|
|
// starts, so the goroutine needs no request-scoped values — only a
|
|
// clean, process-lifetime DB context. ShutdownContext is backed by the
|
|
// global engine, outlives any single request, and is cancelled on app
|
|
// shutdown so we don't leak goroutines past teardown.
|
|
func detach(_ context.Context) context.Context {
|
|
return graceful.GetManager().ShutdownContext()
|
|
}
|