diff --git a/pkg/otter/connect.go b/pkg/otter/connect.go deleted file mode 100644 index cfcf079..0000000 --- a/pkg/otter/connect.go +++ /dev/null @@ -1,12 +0,0 @@ -package otter - -import ( - "context" - - "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" - "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" -) - -func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error { - return database.Connect(ctx, config) -} diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go new file mode 100644 index 0000000..10b5bd4 --- /dev/null +++ b/pkg/plug/algorithm.go @@ -0,0 +1,129 @@ +package plug + +import ( + "context" + "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "gorm.io/gorm" +) + +type User struct { + userFavoriteCount int64 + userName string + userID string +} + +type Favorites struct { + posts []models.Post + nextPage string + lastPage string +} + +type Plug interface { + // GetFavoritePage + // The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used + GetFavoritePage(ctx context.Context, apiKey string, userSource models.UserSource, pageIdentifier string) (Favorites, error) + + // GetUserProfile + // The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used + GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error) +} + +func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) { + ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm") + defer span.End() + + span.SetAttributes( + attribute.String("user_source_id", string(userSource.ID)), + attribute.String("user_source_user_id", string(userSource.UserID)), + attribute.String("user_source_source_id", string(userSource.SourceID)), + ) + + basicLoggingInfo := log.Fields{ + "user_source_id": userSource.ID, + "user_source_user_id": userSource.UserID, + "user_source_source_id": userSource.SourceID, + } + + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Starting mainScrapeAlgorithm") + + taskSummery := TaskSummery{ + AddedPosts: 0, + DeletedPosts: 0, + } + + anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(ctx, db, userSource.UserID, userSource.ID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to get user favorite count from db") + return taskSummery, err + } + + profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource) + if err != nil { + return taskSummery, err + } + + nextPage := "" + +outer: + for anthroveUserFavCount < profile.userFavoriteCount { + select { + case <-ctx.Done(): + break outer + default: + span.AddEvent("Executing getFavorites request") + + favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) + span.AddEvent("Finished executing getFavorites request") + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") + return taskSummery, err + } + + if len(favorites.posts) <= 0 { + span.AddEvent("No more favorites found") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } + + summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.posts) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") + return taskSummery, err + } + + if summery.AddedFavorites != int64(len(favorites.posts)) { + span.AddEvent("user has no more favorites to add") + break outer + } + + nextPage = favorites.nextPage + taskSummery.AddedPosts += int(summery.AddedFavorites) + } + } + + span.AddEvent("Completed scraping algorithm") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm") + return taskSummery, nil +} + +// getUserFavoriteCountFromDatabase +func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) { + var count int64 + + err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error + if err != nil { + return count, err + } + + return count, nil +} diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index e7ec2e6..379671a 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -17,20 +17,20 @@ import ( type server struct { gRPC.UnimplementedPlugConnectorServer - ctx map[string]context.CancelFunc - taskExecutionFunction TaskExecution - sendMessageExecution SendMessageExecution - getMessageExecution GetMessageExecution - source models.Source + ctx map[string]context.CancelFunc + plugInterface Plug + sendMessageExecution SendMessageExecution + getMessageExecution GetMessageExecution + source models.Source } -func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { +func NewGrpcServer(source models.Source, plugAPIInterface Plug, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { return &server{ - ctx: make(map[string]context.CancelFunc), - taskExecutionFunction: taskExecutionFunction, - sendMessageExecution: sendMessageExecution, - getMessageExecution: getMessageExecution, - source: source, + ctx: make(map[string]context.CancelFunc), + plugInterface: plugAPIInterface, + sendMessageExecution: sendMessageExecution, + getMessageExecution: getMessageExecution, + source: source, } } @@ -116,7 +116,12 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) go func() { var err error - taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey) + gorm, err := database.GetGorm(taskCtx) + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Failed to get Gorm client") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + taskSummery, err := Algorithm(taskCtx, s.plugInterface, gorm, userSource, creation.DeepScrape, creation.ApiKey) if err != nil { log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") span.RecordError(err) diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 24d3b80..6e329c6 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -28,14 +28,13 @@ type TaskSummery struct { DeletedPosts int } -type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) var ( - taskExecutionFunction TaskExecution - sendMessageExecution SendMessageExecution - getMessageExecution GetMessageExecution + sendMessageExecution SendMessageExecution + getMessageExecution GetMessageExecution + plugAPIInterface Plug ) func Listen(ctx context.Context, listenAddr string, source models.Source) error { @@ -78,7 +77,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error grpc.StatsHandler(otelgrpc.NewServerHandler()), ) - pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) + pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, plugAPIInterface, sendMessageExecution, getMessageExecution)) go func() { err = grpcServer.Serve(lis) @@ -98,8 +97,8 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error return nil } -func SetTaskExecutionFunction(function TaskExecution) { - taskExecutionFunction = function +func SetTaskExecutionFunction(plugInterface Plug) { + plugAPIInterface = plugInterface } func SetSendMessageExecutionFunction(function SendMessageExecution) {