feat: Refactor task execution to use Plug interface and update algorithm implementation
All checks were successful
Gitea Build Check / Build (push) Successful in 1m3s

This commit is contained in:
SoXX 2024-10-25 22:33:13 +02:00
parent 684d52d64c
commit 41882b9bfb
4 changed files with 152 additions and 31 deletions

View File

@ -1,12 +0,0 @@
package otter
import (
"context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
)
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
return database.Connect(ctx, config)
}

129
pkg/plug/algorithm.go Normal file
View File

@ -0,0 +1,129 @@
package plug
import (
"context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"gorm.io/gorm"
)
type User struct {
userFavoriteCount int64
userName string
userID string
}
type Favorites struct {
posts []models.Post
nextPage string
lastPage string
}
type Plug interface {
// GetFavoritePage
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
GetFavoritePage(ctx context.Context, apiKey string, userSource models.UserSource, pageIdentifier string) (Favorites, error)
// GetUserProfile
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error)
}
func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) {
ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm")
defer span.End()
span.SetAttributes(
attribute.String("user_source_id", string(userSource.ID)),
attribute.String("user_source_user_id", string(userSource.UserID)),
attribute.String("user_source_source_id", string(userSource.SourceID)),
)
basicLoggingInfo := log.Fields{
"user_source_id": userSource.ID,
"user_source_user_id": userSource.UserID,
"user_source_source_id": userSource.SourceID,
}
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Starting mainScrapeAlgorithm")
taskSummery := TaskSummery{
AddedPosts: 0,
DeletedPosts: 0,
}
anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(ctx, db, userSource.UserID, userSource.ID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to get user favorite count from db")
return taskSummery, err
}
profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource)
if err != nil {
return taskSummery, err
}
nextPage := ""
outer:
for anthroveUserFavCount < profile.userFavoriteCount {
select {
case <-ctx.Done():
break outer
default:
span.AddEvent("Executing getFavorites request")
favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage)
span.AddEvent("Finished executing getFavorites request")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page")
return taskSummery, err
}
if len(favorites.posts) <= 0 {
span.AddEvent("No more favorites found")
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
break outer
}
summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.posts)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing")
return taskSummery, err
}
if summery.AddedFavorites != int64(len(favorites.posts)) {
span.AddEvent("user has no more favorites to add")
break outer
}
nextPage = favorites.nextPage
taskSummery.AddedPosts += int(summery.AddedFavorites)
}
}
span.AddEvent("Completed scraping algorithm")
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm")
return taskSummery, nil
}
// getUserFavoriteCountFromDatabase
func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) {
var count int64
err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error
if err != nil {
return count, err
}
return count, nil
}

View File

@ -18,16 +18,16 @@ import (
type server struct {
gRPC.UnimplementedPlugConnectorServer
ctx map[string]context.CancelFunc
taskExecutionFunction TaskExecution
plugInterface Plug
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
}
func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
func NewGrpcServer(source models.Source, plugAPIInterface Plug, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
return &server{
ctx: make(map[string]context.CancelFunc),
taskExecutionFunction: taskExecutionFunction,
plugInterface: plugAPIInterface,
sendMessageExecution: sendMessageExecution,
getMessageExecution: getMessageExecution,
source: source,
@ -116,7 +116,12 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
go func() {
var err error
taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey)
gorm, err := database.GetGorm(taskCtx)
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Failed to get Gorm client")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
taskSummery, err := Algorithm(taskCtx, s.plugInterface, gorm, userSource, creation.DeepScrape, creation.ApiKey)
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)

View File

@ -28,14 +28,13 @@ type TaskSummery struct {
DeletedPosts int
}
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error)
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
var (
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
plugAPIInterface Plug
)
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
@ -78,7 +77,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, plugAPIInterface, sendMessageExecution, getMessageExecution))
go func() {
err = grpcServer.Serve(lis)
@ -98,8 +97,8 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
return nil
}
func SetTaskExecutionFunction(function TaskExecution) {
taskExecutionFunction = function
func SetTaskExecutionFunction(plugInterface Plug) {
plugAPIInterface = plugInterface
}
func SetSendMessageExecutionFunction(function SendMessageExecution) {