Compare commits

..

No commits in common. "main" and "v3.2.0" have entirely different histories.
main ... v3.2.0

4 changed files with 5 additions and 104 deletions

1
go.mod
View File

@ -6,7 +6,6 @@ require (
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0
github.com/matoous/go-nanoid/v2 v2.1.0
github.com/sirupsen/logrus v1.9.3
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
go.opentelemetry.io/otel/exporters/prometheus v0.51.0

2
go.sum
View File

@ -145,8 +145,6 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU=
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=

View File

@ -81,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
log.WithContext(taskCtx).WithFields(log.Fields{
"task_id": id,
"user_source_id": creation.UserSourceId,
}).Debug("Starting task")
}).Info("Starting task")
go func() {
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
@ -92,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully")
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
}
}()
@ -120,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId,
"task_state": plugTaskState.TaskState,
}).Debug("Task status requested")
}).Info("Task status requested")
span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil
@ -141,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId,
"task_state": plugTaskState.TaskState,
}).Debug("Task cancellation requested")
}).Info("Task cancellation requested")
span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil
@ -155,96 +155,3 @@ func (s *server) removeTask(taskID string) {
fn()
delete(s.ctx, taskID)
}
func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
ctx, span := tracer.Start(ctx, "GetUserMessages")
defer span.End()
userSourceID := models.UserSourceID(message.UserSourceId)
userSource, err := database.GetUserSourceByID(ctx, userSourceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Getting userSource")
return nil, err
}
messages, err := s.getMessageExecution(ctx, userSource)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Execution function")
return nil, err
}
var response gRPC.GetMessagesResponse
for _, message := range messages {
response.Messages = append(response.Messages, &gRPC.Message{
FromUserSourceId: string(userSource.ID),
CreatedAt: message.CreatedAt,
Body: message.Body,
Title: message.Title,
})
}
span.SetAttributes(
attribute.String("user_source_id", string(userSource.ID)),
attribute.String("user_id", string(userSource.UserID)),
attribute.String("source_id", string(userSource.SourceID)),
)
fields := log.Fields{
"user_source_id": userSource.ID,
"user_id": userSource.UserID,
"source_id": userSource.SourceID,
"len_messages": len(messages),
}
log.WithContext(ctx).WithFields(fields).Debug("Got User messages")
return &response, err
}
func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
ctx, span := tracer.Start(ctx, "SendMessage")
defer span.End()
response := &gRPC.SendMessageResponse{
Success: false,
}
sourceID := models.UserSourceID(message.UserSourceId)
userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}}
err := s.sendMessageExecution(ctx, userSource, message.Message)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Sending message execution")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return response, err
}
response.Success = true
return response, err
}
func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) {
ctx, span := tracer.Start(ctx, "Ping")
defer span.End()
var pong gRPC.PongResponse
pong.Message = ping.Message
pong.Timestamp = ping.Timestamp
fields := log.Fields{
"messsage": ping.Message,
"timestamp": ping.Timestamp,
}
log.WithContext(ctx).WithFields(fields).Trace("Got pinged")
return &pong, nil
}

View File

@ -7,7 +7,6 @@ import (
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@ -69,9 +68,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
return err
}
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))