2024-08-27 07:15:17 +00:00
package plug
import (
"context"
"slices"
2024-08-29 13:26:58 +00:00
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
2024-08-27 07:56:52 +00:00
log "github.com/sirupsen/logrus"
2024-08-27 07:49:20 +00:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
2024-08-27 07:15:17 +00:00
"gorm.io/gorm"
)
var BatchSize = 50
2024-08-27 08:04:55 +00:00
var BasicLoggingFields log . Fields
2024-08-27 07:15:17 +00:00
2024-08-27 10:25:00 +00:00
type BatchSummery struct {
AddedPosts int64
AddedFavorites int64
}
2024-08-27 07:15:17 +00:00
func BatchPostProcessing ( ctx context . Context , userSource models . UserSource , posts [ ] models . Post ) error {
2024-08-27 10:25:00 +00:00
_ , err := BatchPostProcessingWithSummery ( ctx , userSource , posts )
return err
}
func BatchPostProcessingWithSummery ( ctx context . Context , userSource models . UserSource , posts [ ] models . Post ) ( BatchSummery , error ) {
2024-08-27 07:49:20 +00:00
ctx , span := tracer . Start ( ctx , "BatchPostProcessing" )
defer span . End ( )
2024-08-27 08:04:55 +00:00
span . SetAttributes (
attribute . String ( "user_source_id" , string ( userSource . ID ) ) ,
attribute . String ( "user_source_user_id" , string ( userSource . UserID ) ) ,
attribute . String ( "user_source_source_id" , string ( userSource . SourceID ) ) ,
)
BasicLoggingFields = log . Fields {
"user_source_id" : userSource . ID ,
"user_source_user_id" : userSource . UserID ,
"user_source_source_id" : userSource . SourceID ,
}
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Starting BatchPostProcessing" )
2024-08-27 07:15:17 +00:00
db , err := database . GetGorm ( ctx )
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to get Gorm DB" )
2024-08-27 10:25:00 +00:00
return BatchSummery { } , err
2024-08-27 07:15:17 +00:00
}
postIDs := make ( [ ] string , 0 , len ( posts ) )
for _ , post := range posts {
postIDs = append ( postIDs , post . References [ 0 ] . SourcePostID )
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Collected post IDs" , trace . WithAttributes ( attribute . Int ( "post_count" , len ( postIDs ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . WithField ( "post_count" , len ( postIDs ) ) . Info ( "Collected post IDs" )
2024-08-27 07:15:17 +00:00
2024-08-27 07:49:20 +00:00
existingPosts , err := getAnthrovePost ( ctx , db , userSource . SourceID , postIDs )
2024-08-27 07:15:17 +00:00
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to fetch existing posts" )
2024-08-27 10:25:00 +00:00
return BatchSummery { } , err
2024-08-27 07:15:17 +00:00
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Fetched existing posts" , trace . WithAttributes ( attribute . Int ( "existing_post_count" , len ( existingPosts ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . WithField ( "existing_post_count" , len ( existingPosts ) ) . Info ( "Fetched existing posts" )
2024-08-27 07:15:17 +00:00
var existingPostIDs [ ] models . PostID
for _ , post := range existingPosts {
existingPostIDs = append ( existingPostIDs , models . PostID ( post . PostID ) )
}
var existingFavPostIDs [ ] models . PostID
2024-08-27 07:49:20 +00:00
existingFavPostIDs , err = getAlreadyFavoritesPostIDs ( ctx , db , existingPostIDs , userSource . ID )
2024-08-27 07:15:17 +00:00
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to fetch existing favorite posts" )
2024-08-27 10:25:00 +00:00
return BatchSummery { } , err
2024-08-27 07:15:17 +00:00
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Fetched existing favorite posts" , trace . WithAttributes ( attribute . Int ( "existing_fav_post_count" , len ( existingFavPostIDs ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . WithField ( "existing_fav_post_count" , len ( existingFavPostIDs ) ) . Info ( "Fetched existing favorite posts" )
2024-08-27 07:15:17 +00:00
anthroveFaves := make ( [ ] models . UserFavorite , 0 , len ( existingPosts ) )
newPosts := make ( [ ] models . Post , 0 , len ( existingPosts ) )
for _ , post := range posts {
if ! slices . ContainsFunc ( existingPosts , func ( reference models . PostReference ) bool {
found := reference . SourcePostID == post . References [ 0 ] . SourcePostID
if found {
if ! slices . Contains ( existingFavPostIDs , models . PostID ( reference . PostID ) ) {
anthroveFaves = append ( anthroveFaves , models . UserFavorite {
UserID : userSource . UserID ,
PostID : models . PostID ( reference . PostID ) ,
UserSourceID : userSource . ID ,
} )
}
}
return found
} ) {
anthroveFaves = append ( anthroveFaves , models . UserFavorite {
UserID : userSource . UserID ,
PostID : post . ID ,
UserSourceID : userSource . ID ,
} )
newPosts = append ( newPosts , post )
}
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Processed posts for favorites and new posts" , trace . WithAttributes ( attribute . Int ( "new_post_count" , len ( newPosts ) ) , attribute . Int ( "new_fav_count" , len ( anthroveFaves ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Processed posts for favorites and new posts" )
2024-08-27 07:15:17 +00:00
if len ( newPosts ) > 0 {
2024-08-27 07:49:20 +00:00
err = database . CreatePostInBatch ( ctx , newPosts , BatchSize )
2024-08-27 07:15:17 +00:00
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 07:56:52 +00:00
log . WithContext ( ctx ) . WithError ( err ) . Error ( "Failed to create new posts in batch" )
2024-08-27 10:25:00 +00:00
return BatchSummery { } , err
2024-08-27 07:15:17 +00:00
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Created new posts in batch" , trace . WithAttributes ( attribute . Int ( "batch_size" , BatchSize ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Created new posts in batch" )
2024-08-27 07:15:17 +00:00
}
if len ( anthroveFaves ) > 0 {
2024-08-27 07:49:20 +00:00
err = database . CreateUserFavoriteInBatch ( ctx , anthroveFaves , BatchSize )
2024-08-27 07:15:17 +00:00
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to create user favorites in batch" )
2024-08-27 10:25:00 +00:00
return BatchSummery { } , err
2024-08-27 07:15:17 +00:00
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Created user favorites in batch" , trace . WithAttributes ( attribute . Int ( "batch_size" , BatchSize ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Created user favorites in batch" )
2024-08-27 07:15:17 +00:00
}
2024-08-27 10:25:00 +00:00
return BatchSummery {
AddedPosts : int64 ( len ( newPosts ) ) ,
AddedFavorites : int64 ( len ( anthroveFaves ) ) ,
} , nil
2024-08-27 07:15:17 +00:00
}
func getAnthrovePost ( ctx context . Context , gorm * gorm . DB , id models . SourceID , postIDs [ ] string ) ( [ ] models . PostReference , error ) {
2024-08-27 07:49:20 +00:00
ctx , span := tracer . Start ( ctx , "getAnthrovePost" )
defer span . End ( )
2024-08-27 07:15:17 +00:00
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Starting getAnthrovePost" )
2024-08-27 07:56:52 +00:00
2024-08-27 07:49:20 +00:00
var existingPosts [ ] models . PostReference
2024-08-27 07:15:17 +00:00
err := gorm . WithContext ( ctx ) . Model ( models . PostReference { } ) . Find ( & existingPosts , "source_id = ? AND source_post_id IN ?" , id , postIDs ) . Error
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to fetch Anthrove posts" )
2024-08-27 07:15:17 +00:00
return existingPosts , err
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Fetched Anthrove posts" , trace . WithAttributes ( attribute . Int ( "post_count" , len ( existingPosts ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . WithField ( "post_count" , len ( existingPosts ) ) . Info ( "Fetched Anthrove posts" )
2024-08-27 07:15:17 +00:00
return existingPosts , nil
}
func getAlreadyFavoritesPostIDs ( ctx context . Context , gorm * gorm . DB , existingPostIDs [ ] models . PostID , userSourceID models . UserSourceID ) ( [ ] models . PostID , error ) {
2024-08-27 07:49:20 +00:00
ctx , span := tracer . Start ( ctx , "getAlreadyFavoritesPostIDs" )
defer span . End ( )
2024-08-27 07:15:17 +00:00
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . Info ( "Starting getAlreadyFavoritesPostIDs" )
2024-08-27 07:56:52 +00:00
2024-08-27 07:49:20 +00:00
var existingFavPostIDS [ ] models . PostID
2024-08-27 07:15:17 +00:00
err := gorm . WithContext ( ctx ) . Model ( & models . UserFavorite { } ) . Select ( "post_id" ) . Find ( & existingFavPostIDS , "user_source_id = ? AND post_id IN ?" , userSourceID , existingPostIDs ) . Error
if err != nil {
2024-08-27 07:49:20 +00:00
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithError ( err ) . WithFields ( BasicLoggingFields ) . Error ( "Failed to fetch already favorite post IDs" )
2024-08-27 07:15:17 +00:00
return existingFavPostIDS , err
}
2024-08-27 07:49:20 +00:00
span . AddEvent ( "Fetched already favorite post IDs" , trace . WithAttributes ( attribute . Int ( "fav_post_count" , len ( existingFavPostIDS ) ) ) )
2024-08-27 08:04:55 +00:00
log . WithContext ( ctx ) . WithFields ( BasicLoggingFields ) . WithField ( "fav_post_count" , len ( existingFavPostIDS ) ) . Info ( "Fetched already favorite post IDs" )
2024-08-27 07:15:17 +00:00
return existingFavPostIDS , nil
}