diff --git a/go.mod b/go.mod index ccf8069..1381cb2 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 github.com/golang/protobuf v1.5.4 github.com/matoous/go-nanoid/v2 v2.1.0 - google.golang.org/grpc v1.61.1 + google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 ) @@ -27,12 +27,12 @@ require ( go.opentelemetry.io/otel/log v0.4.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/net v0.21.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.17.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect gorm.io/driver/postgres v1.5.9 // indirect gorm.io/gorm v1.25.11 // indirect ) diff --git a/go.sum b/go.sum index f18f575..a02b076 100644 --- a/go.sum +++ b/go.sum @@ -139,10 +139,10 @@ go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6b go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -150,10 +150,10 @@ golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= -google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= -google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 4e23af4..a2a2eec 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -2,10 +2,10 @@ package plug import ( "context" - "log" - + "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "google.golang.org/protobuf/types/known/timestamppb" + "log" gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" @@ -31,9 +31,7 @@ func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, se } func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { - var user models.User var plugTaskState gRPC.PlugTaskStatus - var err error id, err := gonanoid.New() if err != nil { @@ -43,7 +41,10 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING - user.ID = models.UserID(creation.UserId) + userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId)) + if err != nil { + return nil, err + } // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // by using this method we assign a new context to each new request we get. @@ -53,7 +54,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) go func() { // FIXME: better implement this methode, works for now but needs refactoring - err := s.taskExecutionFunction(ctx, s.source, creation.UserSourceName, user, creation.DeepScrape, creation.ApiKey, func() { + err := s.taskExecutionFunction(ctx, userSource, creation.DeepScrape, creation.ApiKey, func() { s.removeTask(id) }) if err != nil { @@ -109,7 +110,12 @@ func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongR func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { messageResponse := gRPC.SendMessageResponse{Success: true} - err := s.sendMessageExecution(ctx, s.source, request.UserSourceId, request.UserSourceName, request.Message) + userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) + if err != nil { + return nil, err + } + + err = s.sendMessageExecution(ctx, userSource, request.Message) if err != nil { return nil, err } @@ -120,7 +126,12 @@ func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageReque func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { messageResponse := gRPC.GetMessagesResponse{} - messages, err := s.getMessageExecution(ctx, s.source, request.UserSourceId, request.UserSourceName) + userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) + if err != nil { + return nil, err + } + + messages, err := s.getMessageExecution(ctx, userSource) if err != nil { return nil, err } diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 1831328..d7f8890 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -22,9 +22,9 @@ type Message struct { CreatedAt *timestamp.Timestamp } -type TaskExecution func(ctx context.Context, source models.Source, userSourceUsername string, anthroveUser models.User, deepScrape bool, apiKey string, cancelFunction func()) error -type SendMessageExecution func(ctx context.Context, source models.Source, userSourceID string, userSourceUsername string, message string) error -type GetMessageExecution func(ctx context.Context, source models.Source, userSourceID string, userSourceUsername string) ([]Message, error) +type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error +type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error +type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) type Plug struct { address string @@ -49,7 +49,6 @@ func (p *Plug) Listen() error { var err error var source models.Source - log.Print("Check if source exists") source, err = database.GetSourceByDomain(p.ctx, p.source.Domain) if err != nil { if err.Error() == otterError.NoDataFound { @@ -66,6 +65,8 @@ func (p *Plug) Listen() error { log.Print("Source exists") + // Start the Server + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) if err != nil { return err