Compare commits

...

9 Commits
v3.1.0 ... main

Author SHA1 Message Date
38cecba455 fix(grpc): spelling
All checks were successful
Gitea Build Check / Build (push) Successful in 48s
Gitea Build Check / Build (pull_request) Successful in 50s
2024-09-04 14:56:27 +02:00
b497348d2a feat(grpc): add logging
All checks were successful
Gitea Build Check / Build (push) Successful in 51s
Gitea Build Check / Build (pull_request) Successful in 47s
2024-09-04 14:53:01 +02:00
2c8d6bd682 fix(grpc): get necessary information
All checks were successful
Gitea Build Check / Build (push) Successful in 51s
Gitea Build Check / Build (pull_request) Successful in 53s
getting data from database for the userSource
2024-09-04 14:17:00 +02:00
601ce9eae7 feat(grpc): added GetUserMessages & SendMessage
All checks were successful
Gitea Build Check / Build (push) Successful in 50s
Gitea Build Check / Build (pull_request) Successful in 53s
finalized first draft for the endpoint integration
2024-09-04 14:01:04 +02:00
42954c7cf1 feat(grpc): added ping
added ping function
2024-09-04 13:41:29 +02:00
909e84fe1c feat(grpc): added middleware
All checks were successful
Gitea Build Check / Build (push) Successful in 42s
Gitea Build Check / Build (pull_request) Successful in 44s
added the ability to receive otel data from clients
2024-08-30 15:20:32 +02:00
76174e3247 feat(dependencies): update to latest version
All checks were successful
Gitea Build Check / Build (push) Successful in 51s
Gitea Build Check / Build (pull_request) Successful in 51s
updated OtterSpaceSDK to v4.0.0
2024-08-29 15:26:58 +02:00
ea41bef942 fix: move before new context gets created
All checks were successful
Gitea Build Check / Build (push) Successful in 39s
Gitea Build Check / Build (pull_request) Successful in 47s
2024-08-27 14:21:33 +02:00
caef31f48f feat: Add validation check for user account before starting task
All checks were successful
Gitea Build Check / Build (push) Successful in 43s
Gitea Build Check / Build (pull_request) Successful in 45s
2024-08-27 14:17:31 +02:00
7 changed files with 140 additions and 28 deletions

5
go.mod
View File

@ -3,9 +3,10 @@ module git.anthrove.art/Anthrove/plug-sdk/v3
go 1.22.0 go 1.22.0
require ( require (
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0
github.com/matoous/go-nanoid/v2 v2.1.0 github.com/matoous/go-nanoid/v2 v2.1.0
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0
go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
go.opentelemetry.io/otel/exporters/prometheus v0.51.0 go.opentelemetry.io/otel/exporters/prometheus v0.51.0
@ -14,6 +15,7 @@ require (
go.opentelemetry.io/otel/trace v1.29.0 go.opentelemetry.io/otel/trace v1.29.0
google.golang.org/grpc v1.65.0 google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.34.2
gorm.io/gorm v1.25.11
) )
require ( require (
@ -50,5 +52,4 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
gorm.io/driver/postgres v1.5.9 // indirect gorm.io/driver/postgres v1.5.9 // indirect
gorm.io/gorm v1.25.11 // indirect
) )

6
go.sum
View File

@ -1,7 +1,7 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 h1:sx/XEd61RWHvitqUqoLmxA8uxqvWoRtaOTBlXl+q6AM= git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0 h1:LYkdMoRhidEzZGwIupB5K6u6+9m20oJSszqsFsjvaeA=
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0/go.mod h1:iQiORzbTupfz/C2M3rZMK1hegF1cxTYQ+6cJtDw9qlk= git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0/go.mod h1:TckER0W5rc8PwV3wF+LChcsP1fKy4SD/6AQOMt0W7H4=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
@ -145,6 +145,8 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU= go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM= go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=

View File

@ -3,8 +3,8 @@ package otter
import ( import (
"context" "context"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
) )
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error { func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {

View File

@ -2,9 +2,10 @@ package plug
import ( import (
"context" "context"
"errors"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2" gonanoid "github.com/matoous/go-nanoid/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -57,6 +58,18 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
} }
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId))) span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
if !userSource.AccountValidate {
err = errors.New("user is not validated")
log.WithContext(ctx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
return &plugTaskState, err
}
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get. // by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id // This can be used for example to close the context with the given id
@ -68,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
log.WithContext(taskCtx).WithFields(log.Fields{ log.WithContext(taskCtx).WithFields(log.Fields{
"task_id": id, "task_id": id,
"user_source_id": creation.UserSourceId, "user_source_id": creation.UserSourceId,
}).Info("Starting task") }).Debug("Starting task")
go func() { go func() {
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
@ -79,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
} else { } else {
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully") log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully")
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
} }
}() }()
@ -107,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{ log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId, "task_id": task.TaskId,
"task_state": plugTaskState.TaskState, "task_state": plugTaskState.TaskState,
}).Info("Task status requested") }).Debug("Task status requested")
span.SetAttributes(attribute.String("task_id", task.TaskId)) span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil return &plugTaskState, nil
@ -128,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{ log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId, "task_id": task.TaskId,
"task_state": plugTaskState.TaskState, "task_state": plugTaskState.TaskState,
}).Info("Task cancellation requested") }).Debug("Task cancellation requested")
span.SetAttributes(attribute.String("task_id", task.TaskId)) span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil return &plugTaskState, nil
@ -142,3 +155,96 @@ func (s *server) removeTask(taskID string) {
fn() fn()
delete(s.ctx, taskID) delete(s.ctx, taskID)
} }
func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
ctx, span := tracer.Start(ctx, "GetUserMessages")
defer span.End()
userSourceID := models.UserSourceID(message.UserSourceId)
userSource, err := database.GetUserSourceByID(ctx, userSourceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Getting userSource")
return nil, err
}
messages, err := s.getMessageExecution(ctx, userSource)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Execution function")
return nil, err
}
var response gRPC.GetMessagesResponse
for _, message := range messages {
response.Messages = append(response.Messages, &gRPC.Message{
FromUserSourceId: string(userSource.ID),
CreatedAt: message.CreatedAt,
Body: message.Body,
Title: message.Title,
})
}
span.SetAttributes(
attribute.String("user_source_id", string(userSource.ID)),
attribute.String("user_id", string(userSource.UserID)),
attribute.String("source_id", string(userSource.SourceID)),
)
fields := log.Fields{
"user_source_id": userSource.ID,
"user_id": userSource.UserID,
"source_id": userSource.SourceID,
"len_messages": len(messages),
}
log.WithContext(ctx).WithFields(fields).Debug("Got User messages")
return &response, err
}
func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
ctx, span := tracer.Start(ctx, "SendMessage")
defer span.End()
response := &gRPC.SendMessageResponse{
Success: false,
}
sourceID := models.UserSourceID(message.UserSourceId)
userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}}
err := s.sendMessageExecution(ctx, userSource, message.Message)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Sending message execution")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return response, err
}
response.Success = true
return response, err
}
func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) {
ctx, span := tracer.Start(ctx, "Ping")
defer span.End()
var pong gRPC.PongResponse
pong.Message = ping.Message
pong.Timestamp = ping.Timestamp
fields := log.Fields{
"messsage": ping.Message,
"timestamp": ping.Timestamp,
}
log.WithContext(ctx).WithFields(fields).Trace("Got pinged")
return &pong, nil
}

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"slices" "slices"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"

View File

@ -4,9 +4,10 @@ import (
"context" "context"
"net" "net"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
@ -68,7 +69,9 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
return err return err
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))

View File

@ -3,8 +3,8 @@ package plug
import ( import (
"context" "context"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
) )