Compare commits

..

No commits in common. "main" and "v3.0.0" have entirely different histories.
main ... v3.0.0

7 changed files with 34 additions and 159 deletions

5
go.mod
View File

@ -3,10 +3,9 @@ module git.anthrove.art/Anthrove/plug-sdk/v3
go 1.22.0 go 1.22.0
require ( require (
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0 git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0
github.com/matoous/go-nanoid/v2 v2.1.0 github.com/matoous/go-nanoid/v2 v2.1.0
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0
go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
go.opentelemetry.io/otel/exporters/prometheus v0.51.0 go.opentelemetry.io/otel/exporters/prometheus v0.51.0
@ -15,7 +14,6 @@ require (
go.opentelemetry.io/otel/trace v1.29.0 go.opentelemetry.io/otel/trace v1.29.0
google.golang.org/grpc v1.65.0 google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.34.2
gorm.io/gorm v1.25.11
) )
require ( require (
@ -52,4 +50,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
gorm.io/driver/postgres v1.5.9 // indirect gorm.io/driver/postgres v1.5.9 // indirect
gorm.io/gorm v1.25.11 // indirect
) )

6
go.sum
View File

@ -1,7 +1,7 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0 h1:LYkdMoRhidEzZGwIupB5K6u6+9m20oJSszqsFsjvaeA= git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 h1:sx/XEd61RWHvitqUqoLmxA8uxqvWoRtaOTBlXl+q6AM=
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0/go.mod h1:TckER0W5rc8PwV3wF+LChcsP1fKy4SD/6AQOMt0W7H4= git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0/go.mod h1:iQiORzbTupfz/C2M3rZMK1hegF1cxTYQ+6cJtDw9qlk=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
@ -145,8 +145,6 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU= go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM= go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=

View File

@ -1,12 +1,12 @@
package otter package otter
import ( import (
"context" "context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
) )
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error { func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
return database.Connect(ctx, config) return database.Connect(ctx, config)
} }

View File

@ -2,10 +2,9 @@ package plug
import ( import (
"context" "context"
"errors"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2" gonanoid "github.com/matoous/go-nanoid/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -58,18 +57,6 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
} }
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId))) span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
if !userSource.AccountValidate {
err = errors.New("user is not validated")
log.WithContext(ctx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
return &plugTaskState, err
}
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get. // by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id // This can be used for example to close the context with the given id
@ -81,7 +68,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
log.WithContext(taskCtx).WithFields(log.Fields{ log.WithContext(taskCtx).WithFields(log.Fields{
"task_id": id, "task_id": id,
"user_source_id": creation.UserSourceId, "user_source_id": creation.UserSourceId,
}).Debug("Starting task") }).Info("Starting task")
go func() { go func() {
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
@ -92,7 +79,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
} else { } else {
log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully") log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
} }
}() }()
@ -120,7 +107,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{ log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId, "task_id": task.TaskId,
"task_state": plugTaskState.TaskState, "task_state": plugTaskState.TaskState,
}).Debug("Task status requested") }).Info("Task status requested")
span.SetAttributes(attribute.String("task_id", task.TaskId)) span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil return &plugTaskState, nil
@ -141,7 +128,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{ log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId, "task_id": task.TaskId,
"task_state": plugTaskState.TaskState, "task_state": plugTaskState.TaskState,
}).Debug("Task cancellation requested") }).Info("Task cancellation requested")
span.SetAttributes(attribute.String("task_id", task.TaskId)) span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil return &plugTaskState, nil
@ -155,96 +142,3 @@ func (s *server) removeTask(taskID string) {
fn() fn()
delete(s.ctx, taskID) delete(s.ctx, taskID)
} }
func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
ctx, span := tracer.Start(ctx, "GetUserMessages")
defer span.End()
userSourceID := models.UserSourceID(message.UserSourceId)
userSource, err := database.GetUserSourceByID(ctx, userSourceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Getting userSource")
return nil, err
}
messages, err := s.getMessageExecution(ctx, userSource)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Execution function")
return nil, err
}
var response gRPC.GetMessagesResponse
for _, message := range messages {
response.Messages = append(response.Messages, &gRPC.Message{
FromUserSourceId: string(userSource.ID),
CreatedAt: message.CreatedAt,
Body: message.Body,
Title: message.Title,
})
}
span.SetAttributes(
attribute.String("user_source_id", string(userSource.ID)),
attribute.String("user_id", string(userSource.UserID)),
attribute.String("source_id", string(userSource.SourceID)),
)
fields := log.Fields{
"user_source_id": userSource.ID,
"user_id": userSource.UserID,
"source_id": userSource.SourceID,
"len_messages": len(messages),
}
log.WithContext(ctx).WithFields(fields).Debug("Got User messages")
return &response, err
}
func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
ctx, span := tracer.Start(ctx, "SendMessage")
defer span.End()
response := &gRPC.SendMessageResponse{
Success: false,
}
sourceID := models.UserSourceID(message.UserSourceId)
userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}}
err := s.sendMessageExecution(ctx, userSource, message.Message)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Sending message execution")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return response, err
}
response.Success = true
return response, err
}
func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) {
ctx, span := tracer.Start(ctx, "Ping")
defer span.End()
var pong gRPC.PongResponse
pong.Message = ping.Message
pong.Timestamp = ping.Timestamp
fields := log.Fields{
"messsage": ping.Message,
"timestamp": ping.Timestamp,
}
log.WithContext(ctx).WithFields(fields).Trace("Got pinged")
return &pong, nil
}

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"slices" "slices"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
@ -16,17 +16,7 @@ import (
var BatchSize = 50 var BatchSize = 50
var BasicLoggingFields log.Fields var BasicLoggingFields log.Fields
type BatchSummery struct {
AddedPosts int64
AddedFavorites int64
}
func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error { func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error {
_, err := BatchPostProcessingWithSummery(ctx, userSource, posts)
return err
}
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) (BatchSummery, error) {
ctx, span := tracer.Start(ctx, "BatchPostProcessing") ctx, span := tracer.Start(ctx, "BatchPostProcessing")
defer span.End() defer span.End()
@ -49,7 +39,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB") log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB")
return BatchSummery{}, err return err
} }
postIDs := make([]string, 0, len(posts)) postIDs := make([]string, 0, len(posts))
@ -64,7 +54,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts") log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts")
return BatchSummery{}, err return err
} }
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts)))) span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts") log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts")
@ -80,7 +70,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts") log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts")
return BatchSummery{}, err return err
} }
span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs)))) span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts") log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts")
@ -118,7 +108,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch") log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch")
return BatchSummery{}, err return err
} }
span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch") log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch")
@ -130,16 +120,13 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch") log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch")
return BatchSummery{}, err return err
} }
span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch") log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch")
} }
return BatchSummery{ return nil
AddedPosts: int64(len(newPosts)),
AddedFavorites: int64(len(anthroveFaves)),
}, nil
} }
func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) { func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {

View File

@ -4,10 +4,9 @@ import (
"context" "context"
"net" "net"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
@ -69,9 +68,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
return err return err
} }
grpcServer := grpc.NewServer( grpcServer := grpc.NewServer()
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))

View File

@ -3,8 +3,8 @@ package plug
import ( import (
"context" "context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
) )