feat: Add ScrapeHistory tracking and refactor task execution return type
All checks were successful
Gitea Build Check / Build (push) Successful in 58s

- Add `ScrapeHistory` record creation and update during task execution
- Modify `TaskExecution` function to return `TaskSummery` instead of using a cancel function
- Reintroduce and move `removeTask` function for cleanup after task execution
This commit is contained in:
SoXX 2024-10-15 16:08:34 +02:00
parent 9132779673
commit 9de22d3176
2 changed files with 47 additions and 13 deletions

View File

@ -3,6 +3,7 @@ package plug
import ( import (
"context" "context"
"errors" "errors"
"time"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
@ -47,6 +48,21 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
} }
span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id)))
scrapeTaskHistory := models.ScrapeHistory{
ScrapeTaskID: models.ScrapeTaskID(id),
UserSourceID: models.UserSourceID(creation.UserSourceId),
}
scrapeTaskHistory, err = database.CreateScrapeHistory(ctx, scrapeTaskHistory)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.AddEvent("Creates ScrapeTaskHistory", trace.WithAttributes(
attribute.String("user_source_id", creation.UserSourceId),
attribute.String("scrape_task_id", id),
))
plugTaskState.TaskId = id plugTaskState.TaskId = id
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
@ -84,9 +100,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
}).Debug("Starting task") }).Debug("Starting task")
go func() { go func() {
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey)
s.removeTask(id)
})
if err != nil { if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err) span.RecordError(err)
@ -95,6 +109,21 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully") log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully")
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
} }
err = database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{
ScrapeTaskID: models.ScrapeTaskID(id),
UserSourceID: userSource.ID,
FinishedAt: time.Now(),
Error: err.Error(),
AddedPosts: taskSummery.AddedPosts,
DeletedPosts: taskSummery.DeletedPosts,
})
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
s.removeTask(id)
}() }()
span.SetAttributes(attribute.String("task_id", id)) span.SetAttributes(attribute.String("task_id", id))
@ -147,15 +176,6 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
return &plugTaskState, nil return &plugTaskState, nil
} }
func (s *server) removeTask(taskID string) {
fn, exists := s.ctx[taskID]
if !exists {
return
}
fn()
delete(s.ctx, taskID)
}
func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
ctx, span := tracer.Start(ctx, "GetUserMessages") ctx, span := tracer.Start(ctx, "GetUserMessages")
defer span.End() defer span.End()
@ -248,3 +268,12 @@ func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongRe
return &pong, nil return &pong, nil
} }
func (s *server) removeTask(taskID string) {
fn, exists := s.ctx[taskID]
if !exists {
return
}
fn()
delete(s.ctx, taskID)
}

View File

@ -23,7 +23,12 @@ type Message struct {
CreatedAt *timestamppb.Timestamp CreatedAt *timestamppb.Timestamp
} }
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error type TaskSummery struct {
AddedPosts int
DeletedPosts int
}
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error)
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)