New and generic scrape algorithm #11
@ -7,7 +7,7 @@ Anthrove Plug SDK is a Golang-based Software Development Kit (SDK) that provides
|
|||||||
To install the Anthrove Plug SDK, you will need to have Go installed on your system. You can then use the go get command to fetch the SDK:
|
To install the Anthrove Plug SDK, you will need to have Go installed on your system. You can then use the go get command to fetch the SDK:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
go get git.anthrove.art/Anthrove/plug-sdk/v4
|
go get git.anthrove.art/Anthrove/plug-sdk/v5
|
||||||
```
|
```
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
@ -141,7 +141,7 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
plug.SetTaskExecutionFunction(service.YourTaskFunction)
|
plug.SetTaskExecutionFunction(service.plugInterface)
|
||||||
plug.SetGetMessageExecutionFunction(service.YourMessageFunction)
|
plug.SetGetMessageExecutionFunction(service.YourMessageFunction)
|
||||||
err = plug.Listen(ctx, ":8080", source)
|
err = plug.Listen(ctx, ":8080", source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
2
go.mod
2
go.mod
@ -1,4 +1,4 @@
|
|||||||
module git.anthrove.art/Anthrove/plug-sdk/v4
|
module git.anthrove.art/Anthrove/plug-sdk/v5
|
||||||
|
|
||||||
go 1.23.0
|
go 1.23.0
|
||||||
|
|
||||||
|
@ -1,12 +0,0 @@
|
|||||||
package otter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
|
|
||||||
return database.Connect(ctx, config)
|
|
||||||
}
|
|
159
pkg/plug/algorithm.go
Normal file
159
pkg/plug/algorithm.go
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
package plug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"slices"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type User struct {
|
||||||
|
UserFavoriteCount int64
|
||||||
|
UserName string
|
||||||
|
UserID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Favorites struct {
|
||||||
|
Posts []models.Post
|
||||||
|
NextPage string
|
||||||
|
LastPage string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Plug interface {
|
||||||
|
// GetFavoritePage
|
||||||
|
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
|
||||||
|
GetFavoritePage(ctx context.Context, apiKey string, userSource models.UserSource, pageIdentifier string) (Favorites, error)
|
||||||
|
|
||||||
|
// GetUserProfile
|
||||||
|
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
|
||||||
|
GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSource, anthroveUserFavCount int64, deepScrape bool, apiKey string) (TaskSummery, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
span.SetAttributes(
|
||||||
|
attribute.String("user_source_id", string(userSource.ID)),
|
||||||
|
attribute.String("user_source_user_id", string(userSource.UserID)),
|
||||||
|
attribute.String("user_source_source_id", string(userSource.SourceID)),
|
||||||
|
)
|
||||||
|
|
||||||
|
basicLoggingInfo := log.Fields{
|
||||||
|
"user_source_id": userSource.ID,
|
||||||
|
"user_source_user_id": userSource.UserID,
|
||||||
|
"user_source_source_id": userSource.SourceID,
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Starting mainScrapeAlgorithm")
|
||||||
|
|
||||||
|
taskSummery := TaskSummery{
|
||||||
|
AddedPosts: 0,
|
||||||
|
DeletedPosts: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource)
|
||||||
|
if err != nil {
|
||||||
|
return taskSummery, err
|
||||||
|
}
|
||||||
|
|
||||||
Alphyron marked this conversation as resolved
Outdated
|
|||||||
|
userSource.AccountID = profile.UserID
|
||||||
Alphyron marked this conversation as resolved
Outdated
Alphyron
commented
Issue still exists Issue still exists
https://git.anthrove.art/Anthrove/plug-e621/issues/15
|
|||||||
|
userSource.AccountUsername = profile.UserName
|
||||||
|
|
||||||
|
nextPage := ""
|
||||||
|
var newPosts []models.Post
|
||||||
|
var anthroveFaves []models.UserFavorite
|
||||||
|
|
||||||
|
outer:
|
||||||
|
for {
|
||||||
|
if anthroveUserFavCount >= profile.UserFavoriteCount && profile.UserFavoriteCount > 0 {
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
break outer
|
||||||
|
default:
|
||||||
|
span.AddEvent("Executing getFavorites request")
|
||||||
|
favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage)
|
||||||
|
span.AddEvent("Finished executing getFavorites request")
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page")
|
||||||
|
return taskSummery, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(favorites.Posts) == 0 {
|
||||||
|
span.AddEvent("No more favorites found")
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("Executing BatchPostProcessingWithSummery")
|
||||||
|
pageNewPosts, pageAnthroveFaves, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts)
|
||||||
|
|
||||||
|
anthroveFaves = append(anthroveFaves, pageAnthroveFaves...)
|
||||||
|
newPosts = append(newPosts, pageNewPosts...)
|
||||||
|
|
||||||
|
span.AddEvent("Finished executing BatchPostProcessingWithSummery")
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing")
|
||||||
|
return taskSummery, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pageAnthroveFaves) <= 0 || len(favorites.Posts) != len(pageAnthroveFaves) {
|
||||||
|
span.AddEvent("No more favorites found to add")
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
|
||||||
|
nextPage = favorites.NextPage
|
||||||
|
taskSummery.AddedPosts += len(pageAnthroveFaves)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(newPosts) > 0 {
|
||||||
|
span.AddEvent("Executing CreatePostInBatch")
|
||||||
|
err = database.CreatePostInBatch(ctx, newPosts, BatchSize)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch")
|
||||||
|
return taskSummery, err
|
||||||
|
}
|
||||||
|
span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
|
||||||
|
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(anthroveFaves) > 0 {
|
||||||
|
span.AddEvent("Executing CreateUserFavoriteInBatch")
|
||||||
|
|
||||||
|
for i, fav := range anthroveFaves {
|
||||||
|
fav.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i) * -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.Reverse(anthroveFaves)
|
||||||
|
|
||||||
|
err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch")
|
||||||
|
return taskSummery, err
|
||||||
|
}
|
||||||
|
span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
|
||||||
|
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch")
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("Completed scraping algorithm")
|
||||||
|
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm")
|
||||||
|
return taskSummery, nil
|
||||||
|
}
|
@ -3,11 +3,12 @@ package plug
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"gorm.io/gorm"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
gRPC "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc"
|
gRPC "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc"
|
||||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
@ -17,20 +18,20 @@ import (
|
|||||||
|
|
||||||
type server struct {
|
type server struct {
|
||||||
gRPC.UnimplementedPlugConnectorServer
|
gRPC.UnimplementedPlugConnectorServer
|
||||||
ctx map[string]context.CancelFunc
|
ctx map[string]context.CancelFunc
|
||||||
taskExecutionFunction TaskExecution
|
plugInterface Plug
|
||||||
sendMessageExecution SendMessageExecution
|
sendMessageExecution SendMessageExecution
|
||||||
getMessageExecution GetMessageExecution
|
getMessageExecution GetMessageExecution
|
||||||
source models.Source
|
source models.Source
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
|
func NewGrpcServer(source models.Source, plugAPIInterface Plug, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
|
||||||
return &server{
|
return &server{
|
||||||
ctx: make(map[string]context.CancelFunc),
|
ctx: make(map[string]context.CancelFunc),
|
||||||
taskExecutionFunction: taskExecutionFunction,
|
plugInterface: plugAPIInterface,
|
||||||
sendMessageExecution: sendMessageExecution,
|
sendMessageExecution: sendMessageExecution,
|
||||||
getMessageExecution: getMessageExecution,
|
getMessageExecution: getMessageExecution,
|
||||||
source: source,
|
source: source,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,9 +115,38 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
|||||||
"user_source_id": creation.UserSourceId,
|
"user_source_id": creation.UserSourceId,
|
||||||
}).Debug("Starting task")
|
}).Debug("Starting task")
|
||||||
|
|
||||||
|
db, err := database.GetGorm(taskCtx)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{
|
||||||
|
ScrapeTaskID: models.ScrapeTaskID(id),
|
||||||
|
UserSourceID: userSource.ID,
|
||||||
|
FinishedAt: time.Now(),
|
||||||
|
Error: errorString(err),
|
||||||
|
})
|
||||||
|
return &plugTaskState, errors.Join(err, dberr)
|
||||||
|
}
|
||||||
|
|
||||||
|
anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(taskCtx, db, userSource.UserID, userSource.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{
|
||||||
|
ScrapeTaskID: models.ScrapeTaskID(id),
|
||||||
|
UserSourceID: userSource.ID,
|
||||||
|
FinishedAt: time.Now(),
|
||||||
|
Error: errorString(err),
|
||||||
|
})
|
||||||
|
return &plugTaskState, errors.Join(err, dberr)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey)
|
|
||||||
|
taskSummery, err := algorithm(taskCtx, s.plugInterface, userSource, anthroveUserFavCount, creation.DeepScrape, creation.ApiKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
@ -301,3 +331,15 @@ func errorString(err error) string {
|
|||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getUserFavoriteCountFromDatabase
|
||||||
|
func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) {
|
||||||
|
var count int64
|
||||||
|
|
||||||
|
err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error
|
||||||
|
if err != nil {
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
@ -3,7 +3,7 @@ package plug
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/plug-sdk/v4/pkg/telemetry"
|
"git.anthrove.art/Anthrove/plug-sdk/v5/pkg/telemetry"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
)
|
)
|
||||||
|
@ -2,8 +2,6 @@ package plug
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"slices"
|
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -11,6 +9,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
"slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
var BatchSize = 50
|
var BatchSize = 50
|
||||||
@ -21,12 +20,7 @@ type BatchSummery struct {
|
|||||||
AddedFavorites int64
|
AddedFavorites int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error {
|
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, error) {
|
||||||
_, err := BatchPostProcessingWithSummery(ctx, userSource, posts)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) (BatchSummery, error) {
|
|
||||||
ctx, span := tracer.Start(ctx, "BatchPostProcessing")
|
ctx, span := tracer.Start(ctx, "BatchPostProcessing")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@ -49,7 +43,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
|||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB")
|
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB")
|
||||||
return BatchSummery{}, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
postIDs := make([]string, 0, len(posts))
|
postIDs := make([]string, 0, len(posts))
|
||||||
@ -59,18 +53,18 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
|||||||
span.AddEvent("Collected post IDs", trace.WithAttributes(attribute.Int("post_count", len(postIDs))))
|
span.AddEvent("Collected post IDs", trace.WithAttributes(attribute.Int("post_count", len(postIDs))))
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("post_count", len(postIDs)).Info("Collected post IDs")
|
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("post_count", len(postIDs)).Info("Collected post IDs")
|
||||||
|
|
||||||
existingPosts, err := getAnthrovePost(ctx, db, userSource.SourceID, postIDs)
|
existingPostsReferences, err := getAnthrovePostReferences(ctx, db, userSource.SourceID, postIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts")
|
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts")
|
||||||
return BatchSummery{}, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts))))
|
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPostsReferences))))
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts")
|
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPostsReferences)).Info("Fetched existing posts")
|
||||||
|
|
||||||
var existingPostIDs []models.PostID
|
var existingPostIDs []models.PostID
|
||||||
for _, post := range existingPosts {
|
for _, post := range existingPostsReferences {
|
||||||
existingPostIDs = append(existingPostIDs, models.PostID(post.PostID))
|
existingPostIDs = append(existingPostIDs, models.PostID(post.PostID))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,15 +74,15 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
|||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts")
|
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts")
|
||||||
return BatchSummery{}, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs))))
|
span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs))))
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts")
|
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts")
|
||||||
|
|
||||||
anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts))
|
anthroveFaves := make([]models.UserFavorite, 0, len(existingPostsReferences))
|
||||||
newPosts := make([]models.Post, 0, len(existingPosts))
|
newPosts := make([]models.Post, 0, len(existingPostsReferences))
|
||||||
for _, post := range posts {
|
for _, post := range posts {
|
||||||
if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool {
|
if !slices.ContainsFunc(existingPostsReferences, func(reference models.PostReference) bool {
|
||||||
found := reference.SourcePostID == post.References[0].SourcePostID
|
found := reference.SourcePostID == post.References[0].SourcePostID
|
||||||
if found {
|
if found {
|
||||||
if !slices.Contains(existingFavPostIDs, models.PostID(reference.PostID)) {
|
if !slices.Contains(existingFavPostIDs, models.PostID(reference.PostID)) {
|
||||||
@ -105,6 +99,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
|||||||
UserID: userSource.UserID,
|
UserID: userSource.UserID,
|
||||||
PostID: post.ID,
|
PostID: post.ID,
|
||||||
UserSourceID: userSource.ID,
|
UserSourceID: userSource.ID,
|
||||||
|
UserSource: models.UserSource{},
|
||||||
})
|
})
|
||||||
newPosts = append(newPosts, post)
|
newPosts = append(newPosts, post)
|
||||||
}
|
}
|
||||||
@ -112,41 +107,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
|||||||
span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves))))
|
span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves))))
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts")
|
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts")
|
||||||
|
|
||||||
if len(newPosts) > 0 {
|
return newPosts, anthroveFaves, nil
|
||||||
err = database.CreatePostInBatch(ctx, newPosts, BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
span.RecordError(err)
|
|
||||||
span.SetStatus(codes.Error, err.Error())
|
|
||||||
log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch")
|
|
||||||
return BatchSummery{}, err
|
|
||||||
}
|
|
||||||
span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
|
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(anthroveFaves) > 0 {
|
|
||||||
err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
span.RecordError(err)
|
|
||||||
span.SetStatus(codes.Error, err.Error())
|
|
||||||
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch")
|
|
||||||
return BatchSummery{}, err
|
|
||||||
}
|
|
||||||
span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
|
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch")
|
|
||||||
}
|
|
||||||
|
|
||||||
return BatchSummery{
|
|
||||||
AddedPosts: int64(len(newPosts)),
|
|
||||||
AddedFavorites: int64(len(anthroveFaves)),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
|
func getAnthrovePostReferences(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
|
||||||
ctx, span := tracer.Start(ctx, "getAnthrovePost")
|
ctx, span := tracer.Start(ctx, "getAnthrovePostReferences")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePost")
|
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePostReferences")
|
||||||
|
|
||||||
var existingPosts []models.PostReference
|
var existingPosts []models.PostReference
|
||||||
err := gorm.WithContext(ctx).Model(models.PostReference{}).Find(&existingPosts, "source_id = ? AND source_post_id IN ?", id, postIDs).Error
|
err := gorm.WithContext(ctx).Model(models.PostReference{}).Find(&existingPosts, "source_id = ? AND source_post_id IN ?", id, postIDs).Error
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
pb "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc"
|
pb "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
@ -15,7 +15,7 @@ import (
|
|||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
)
|
)
|
||||||
|
|
||||||
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v4/pkg/plug")
|
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v5/pkg/plug")
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Title string
|
Title string
|
||||||
@ -28,14 +28,13 @@ type TaskSummery struct {
|
|||||||
DeletedPosts int
|
DeletedPosts int
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error)
|
|
||||||
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
|
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
|
||||||
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
|
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
taskExecutionFunction TaskExecution
|
sendMessageExecution SendMessageExecution
|
||||||
sendMessageExecution SendMessageExecution
|
getMessageExecution GetMessageExecution
|
||||||
getMessageExecution GetMessageExecution
|
plugAPIInterface Plug
|
||||||
)
|
)
|
||||||
|
|
||||||
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
|
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
|
||||||
@ -78,7 +77,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
|
|||||||
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
||||||
)
|
)
|
||||||
|
|
||||||
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
|
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, plugAPIInterface, sendMessageExecution, getMessageExecution))
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err = grpcServer.Serve(lis)
|
err = grpcServer.Serve(lis)
|
||||||
@ -98,8 +97,9 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetTaskExecutionFunction(function TaskExecution) {
|
// RegisterPlugInterface sets the provided Plug interface implementation for task execution.
|
||||||
Alphyron marked this conversation as resolved
Outdated
Alphyron
commented
irgendwie passt der Name der funktion nicht zu dem was gemacht wird. Es wird weder eine Function gesetzt, noch wird ein execution stuff gesetzt. irgendwie passt der Name der funktion nicht zu dem was gemacht wird. Es wird weder eine Function gesetzt, noch wird ein execution stuff gesetzt.
Hier ist es ja nur ein Interface als Schnittstelle zu der PlugAPI
|
|||||||
taskExecutionFunction = function
|
func RegisterPlugInterface(plugInterface Plug) {
|
||||||
|
plugAPIInterface = plugInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetSendMessageExecutionFunction(function SendMessageExecution) {
|
func SetSendMessageExecutionFunction(function SendMessageExecution) {
|
||||||
|
Loading…
Reference in New Issue
Block a user
Hier muss eine sonderbedingung gemacht weerden, weil UserFavoriteCount kann ja -1 sein (wenn es nicht ausgelesen werden kann.
@SoXX to fix this problem and the Issue one comment bellow I would recommend, to change the algorithm to the following:
https://www.plantuml.com/plantuml/uml/RP1FRzH03CNl_XIlj-mDhea4BIgWIXMAgChTk-D9HyoVoEFija9yTtOIYTBIkOtzUy_Flgp6QakAT34hJucnLFaXQk70ySQZPA8LeVwsiCDz5Hsr-105Nal269VfQhmPwFJGQftf8ZiwFw3_AeOlV1nvsk1LFH2mUSbZg1RoXB5KgnlHs7__rv_-vvkdRFqtFPgcYQwSGoxsch62APOzHypdF-AvERo9RsEUSS_7bKPNr8aYL8Gq5pNETe4i9wa67xJQRa1B43owpo_TFk3T3hy80FOg_9E0tslOQ_4X2xx9esr7i8AxW_8i0qbswtM9-liv5cuvywkr1kg_or6qIfk4spLdBYUKw9vpzNyTjZXTi1Thm1v4fPKODN6CSC5xbNmGxCLE8haX2LtYfxtWFLBzkDji7Pj0nHRDk5jIOdtgYQgLcIubkoN5Fm00
fixed an issue @SoXX
https://plantuml.github.io/plantuml-core/raw.html?ZL9DJzmm4BtxLpmkABbKzBefL53Q2WbLKH6zUpVZh5N74uqdk_3lQoTBAGkjUeldVSoRoPoCOll1OahqWqJzneOR1ux69BMYPdNBjiDz8cc5dGy49poW3LD_sTuqPhyjjSgnq8waDWRm3fMDkXNUKH5-iRjFF4N51uoBnxj3cSKhBTZfwJ_02vpLq2r5L2eJrRa9i1QoF_CNnSwxqdVrbHHPsDh-aB8uDK-HvdBBIHFEpuyCyHV7UNXwy4bzx0_YPVvdi_bzVTMyvkSiTC3VyYYQ8hhiEaJMOuuo-i1h6p3cDq86cpVfIvjdkhExcOsAfIE1J_33oABvfvmWqfuovhmahHiyRUVpIxUB_rpGDJaV2T_eGPUTv1Xt6x4ZDIQpvkimPdK_XhAPoF5eoiQilfV3ILGQutQE7NaF3jqVCNyxBEagho-cAac5IiaY3JO4v7nEA6u8Qz8zvJaoPK0rsCSruJs6zB5UR3kh126T9E9k6WOhZqqvVXv_0G00