Add unified observability type system and Grafana Cloud exporter#13076
Add unified observability type system and Grafana Cloud exporter#13076epinzur wants to merge 2 commits intoesp/obs_core_additionsfrom
Conversation
Implement GrafanaCloudExporter that exports all three observability signals to Grafana Cloud's managed backends: - Traces → Grafana Tempo (via OTLP/HTTP JSON at /v1/traces) - Metrics → Grafana Mimir (via OTLP/HTTP JSON at /otlp/v1/metrics) - Logs → Grafana Loki (via JSON push API at /loki/api/v1/push) Key features: - Extends BaseExporter with onTracingEvent, onLogEvent, onMetricEvent handlers - Configurable batching (batchSize, flushIntervalMs) with automatic periodic flush - Basic auth with instanceId:apiKey for all endpoints - Zone-based default endpoint construction - Env var fallbacks for all configuration (GRAFANA_CLOUD_*) - Re-buffering on transient failures with bounded growth cap - Completion-only pattern for traces (only exports SPAN_ENDED events) - OTLP-compliant span conversion with GenAI semantic attributes - Loki log grouping by stream labels (low-cardinality only) - Smart histogram bucket selection based on metric name 59 unit tests covering formatters and exporter behavior. https://claude.ai/code/session_01V4TiknqfJt5SpaQtxQYpjD
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughIntroduces a unified Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes High number of affected files (70+) with consistent but pervasive pattern changes replacing Possibly related PRs
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/core/src/tools/tool-builder/builder.ts (1)
248-258:⚠️ Potential issue | 🟡 Minor
tracing,loggerVNext, andmetricsleak into...rest(and into logs).The comment on line 248 explains that
tracingContextis excluded because it "may contain sensitive observability credentials." However, the newPartial<ObservabilityContext>fieldstracing(which is the same object astracingContext),loggerVNext, andmetricsare not stripped and will end up inrest, which is later passed tologger.debug(start, { ...rest, ... })at line 472.Proposed fix
const { logger, mastra: _mastra, memory: _memory, requestContext, model, tracingContext: _tracingContext, tracingPolicy: _tracingPolicy, + tracing: _tracing, + loggerVNext: _loggerVNext, + metrics: _metrics, ...rest } = options;
🤖 Fix all issues with AI agents
In `@observability/grafana-cloud/src/exporter.ts`:
- Around line 333-350: sendRequest currently calls fetch without a timeout; add
a configurable timeout property to the exporter class (e.g., timeoutMs) and use
an AbortController inside sendRequest to abort the request after that timeout.
Create the AbortController, pass controller.signal to fetch, set a timer to call
controller.abort() after timeoutMs (and clear the timer on completion), and
catch the abort/timeout case to throw a clear timeout-specific Error before
rethrowing other errors; update any constructor or config parsing to accept
timeoutMs so it can be tuned.
In `@observability/grafana-cloud/src/formatters/logs.ts`:
- Around line 38-62: In buildLogLine, metadata entries can overwrite reserved
fields (message, traceId, spanId, data) because the code copies log.metadata
into entry; update buildLogLine to either skip reserved keys or namespace
metadata keys before merging (e.g., prefix keys with metadata. or put under a
metadata field) by checking each metadata key against the reserved set {message,
traceId, spanId, data} and only adding non-reserved keys (or adding them under
entry.metadata) so existing structured fields set earlier are never overwritten.
🧹 Nitpick comments (17)
packages/core/src/processors/processors/system-prompt-scrubber.ts (1)
103-112: Extra properties leak intoresolveObservabilityContext.After destructuring
partandabort,reststill containsstreamPartsandstate, which get passed intoresolveObservabilityContext. This works correctly at runtime (extra keys are ignored) but reduces clarity about what's actually flowing into the observability resolver.If you want to be explicit:
♻️ Optional: destructure non-observability fields
- const { part, abort, ...rest } = args; - const observabilityContext = resolveObservabilityContext(rest); + const { part, streamParts, state, abort, ...observabilityFields } = args; + const observabilityContext = resolveObservabilityContext(observabilityFields);packages/core/src/processors/processors/moderation.ts (1)
225-226:stateleaks intorest, which is passed toresolveObservabilityContext.
state: Record<string, any>is declared in the args type (line 220) but not destructured here, so it ends up inrestalongside theObservabilityContextpartial fields. WhileresolveObservabilityContextconstructs a fresh object (sostatedoesn't propagate further), destructuring it out keeps the intent clear and the pattern consistent withprocessInput.Suggested fix
- const { part, streamParts, abort, ...rest } = args; + const { part, streamParts, state, abort, ...rest } = args;observability/grafana-cloud/vitest.config.ts (1)
3-9:isolate: falsemay cause test pollution if exporter tests use fake timers or global mocks.The exporter tests likely manipulate timers (for flush intervals/batching). With
isolate: false, all test files share a single worker context, so leaked timer state or global mocks from one test file could affect another. If you observe flaky tests, consider enabling isolation or ensuring proper cleanup inafterEach/afterAllhooks..changeset/thin-knives-accept.md (1)
5-15: Add a code example showing public API usage.Per the changeset guidelines, new features should include a code example demonstrating the public API. The changeset introduces
createObservabilityContext(),resolveObservabilityContext(), and the newMastragetters but doesn't show how a developer would use them. A brief example (e.g., accessingmastra.loggerVNextor creating a context) would help users understand the change.Also, verify that a separate changeset exists for the
@mastra/grafana-cloudpackage, since this changeset's frontmatter only covers@mastra/core.As per coding guidelines: "If the change is a breaking change or is adding a new feature, ensure that a code example is provided. This code example should show the public API usage."
packages/core/src/processors/processors/pii-detector.ts (1)
581-602:streamPartsandstateleak intorestpassed toresolveObservabilityContext.In
processOutputStream, the destructuringconst { part, abort, ...rest } = argsleavesstreamPartsandstateinrestalongside theObservabilityContextfields. This is functionally harmless (the resolver ignores unknown keys), but it's less precise than the other methods. Consider explicitly destructuring the unused fields for clarity.♻️ Suggested cleanup
- const { part, abort, ...rest } = args; + const { part, streamParts: _, state: _state, abort, ...rest } = args; const observabilityContext = resolveObservabilityContext(rest);packages/core/src/processors/runner.ts (2)
780-790: Stale JSDoc:@param args.tracingContextno longer exists in the signature.Line 783 references
args.tracingContextbut the parameter is nowPartial<ObservabilityContext>spread intoargs. Update to reflect the new shape.
988-1010: Same stale JSDoc at line 992 referencing@param args.tracingContext.The
runProcessOutputStepJSDoc still documentstracingContextas a named parameter, but the actual signature uses& Partial<ObservabilityContext>.packages/core/src/observability/context.ts (1)
20-20: Pre-existing: duplicate'createRun'entry and redundant condition.
WORKFLOW_METHODS_TO_WRAPlists'createRun'twice (line 20), and line 150 checksprop === 'createRun' || prop === 'createRun'— both sides are identical. Not introduced by this PR, but worth cleaning up when convenient.♻️ Suggested cleanup
-const WORKFLOW_METHODS_TO_WRAP = ['execute', 'createRun', 'createRun']; +const WORKFLOW_METHODS_TO_WRAP = ['execute', 'createRun'];- if (prop === 'createRun' || prop === 'createRun') { + if (prop === 'createRun') {Also applies to: 148-155
packages/core/src/evals/run/index.ts (1)
267-365:resolveObservabilityContext(item)is called repeatedly for the same item inrunScorers.Lines 282, 312, and 334 each call
resolveObservabilityContext(item)for the sameitemwithin the same function invocation. Consider resolving once at the top ofrunScorersand reusing it.♻️ Suggested refactor
async function runScorers( scorers: MastraScorer<any, any, any, any>[] | WorkflowScorerConfig, targetResult: any, item: RunEvalsDataItem<any>, ): Promise<Record<string, any>> { const scorerResults: Record<string, any> = {}; + const observabilityContext = resolveObservabilityContext(item); if (Array.isArray(scorers)) { for (const scorer of scorers) { try { const score = await scorer.run({ input: targetResult.scoringData?.input, output: targetResult.scoringData?.output, groundTruth: item.groundTruth, requestContext: item.requestContext, - ...resolveObservabilityContext(item), + ...observabilityContext, });Apply the same replacement at the other two call sites (lines 312 and 334).
packages/core/src/processors/processors/structured-output.ts (1)
87-88:retryCountleaks intorestand is passed toresolveObservabilityContext.After destructuring
{ part, state, streamParts, abort, ...rest }, therestobject still containsretryCount(from theargstype). WhileresolveObservabilityContextsafely ignores unknown fields, it's slightly imprecise. Consider also destructuringretryCountto keep the intent clear.♻️ Suggested fix
- const { part, state, streamParts, abort, ...rest } = args; + const { part, state, streamParts, abort, retryCount, ...rest } = args;observability/grafana-cloud/src/formatters/traces.ts (1)
164-171: Metadata values that aren'tstring | number | booleanwill be silently cast.If
span.metadatacontains a value like abigint,symbol, orfunction, thetypeof v === 'object'check won't stringify it, and theas string | number | booleancast masks the type mismatch.kvwould then produce an incorrect OTLP value (e.g.,{ doubleValue: NaN }for a symbol).Consider adding a
typeofguard for the non-object branch to ensure only primitive-safe values are forwarded:Suggested tightening
if (v === null || v === undefined) continue; - const val = typeof v === 'object' ? JSON.stringify(v) : v; - attrs.push(kv(`mastra.metadata.${k}`, val as string | number | boolean)); + const val = typeof v === 'object' ? JSON.stringify(v) + : typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' + ? v + : String(v); + attrs.push(kv(`mastra.metadata.${k}`, val));observability/grafana-cloud/src/formatters/logs.ts (2)
75-96:entity_nameas a Loki stream label may cause high-cardinality issues.Loki strongly penalizes high-cardinality label sets. If
entityNamevaries per request (e.g., dynamically named agents or unique user IDs), this will create excessive streams and degrade query performance. Consider whetherentity_nameshould remain a label or be moved into the log line body (where it's still searchable via LogQL filters).
30-32:dateToNanoStringis duplicated acrosstraces.tsandlogs.ts.Both formatters define the same function. Consider extracting it to a shared utility module (e.g.,
formatters/utils.ts).observability/grafana-cloud/src/exporter.ts (2)
159-164: AvoidObject.definePropertyto bypassreadonly— use a private mutable field instead.Using
Object.definePropertyto mutate areadonlyfield (line 162) undermines TypeScript's compile-time guarantee and is surprising to future readers. A cleaner approach is a private mutable field with thereadonlymodifier removed, or a separate_serviceNamebacking field.♻️ Suggested approach
- private readonly serviceName: string; + private serviceName: string; override init(options: InitExporterOptions): void { if (options.config?.serviceName && this.serviceName === DEFAULTS.serviceName) { - Object.defineProperty(this, 'serviceName', { value: options.config.serviceName }); + this.serviceName = options.config.serviceName; } }
78-108: Consider extracting the disabled-state initialization to reduce duplication.The "missing instanceId" and "missing apiKey" blocks are nearly identical (21 lines each). A small private helper or early-return pattern would halve this code.
♻️ Example: extract helper
+ private initDisabled(reason: string): void { + this.instanceId = ''; + this.apiKey = ''; + this.authHeader = ''; + this.tempoEndpoint = ''; + this.mimirEndpoint = ''; + this.lokiEndpoint = ''; + this.serviceName = DEFAULTS.serviceName; + this.batchSize = DEFAULTS.batchSize; + this.flushIntervalMs = DEFAULTS.flushIntervalMs; + this.setDisabled(reason); + } // Then in constructor: if (!instanceId) { - this.instanceId = ''; - this.apiKey = ''; - ... (9 more lines) + this.initDisabled('Missing instanceId. Set GRAFANA_CLOUD_INSTANCE_ID env var or pass instanceId in config.'); return; } if (!apiKey) { + this.initDisabled('Missing apiKey. Set GRAFANA_CLOUD_API_KEY env var or pass apiKey in config.'); - this.instanceId = ''; - ... (9 more lines) return; }packages/core/src/observability/types/core.ts (1)
370-426:ObservabilityExporter— consider makingexportTracingEventoptional or documenting its relationship withonTracingEvent.The interface has both
onTracingEvent?(optional) andexportTracingEvent(required). TheGrafanaCloudExporterimplementation shows_exportTracingEventdelegating toonTracingEvent, which suggestsexportTracingEventis a legacy path. This dual-path design is fine for backward compatibility, but a brief JSDoc note onexportTracingEventexplaining its relationship toonTracingEvent(e.g., "Legacy path; prefer implementingonTracingEventfor new exporters") would help implementers.packages/core/src/agent/agent.ts (1)
2199-2244: Minor: variable shadowing ofrestinsidelistClientTools.The outer
...rest(line 2207) captures theObservabilityContextfields from the function params, while the innerconst { execute, ...rest } = tool(line 2227) shadows it within the loop body. There's no functional bug sinceobservabilityContextis already resolved on line 2217, but the shadowing can confuse readers.Consider renaming one of the two for clarity, e.g.,
toolRestortoolWithoutExecutefor the inner variable.✏️ Optional rename to avoid shadowing
- const { execute, ...rest } = tool; + const { execute, ...toolDef } = tool; const options: ToolOptions = { ... }; - const convertedToCoreTool = makeCoreTool(rest, options, 'client-tool', autoResumeSuspendedTools); + const convertedToCoreTool = makeCoreTool(toolDef, options, 'client-tool', autoResumeSuspendedTools);
| private async sendRequest(url: string, body: unknown): Promise<void> { | ||
| const response = await fetch(url, { | ||
| method: 'POST', | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| Authorization: this.authHeader, | ||
| 'X-Scope-OrgID': this.instanceId, | ||
| }, | ||
| body: JSON.stringify(body), | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| const responseText = await response.text().catch(() => '(no body)'); | ||
| throw new Error( | ||
| `Grafana Cloud API error: ${response.status} ${response.statusText} - ${responseText}`, | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Add a timeout to fetch calls to avoid indefinite hangs.
sendRequest makes HTTP calls without a timeout or AbortSignal. If Grafana Cloud is unreachable or slow, this will block the flush indefinitely — particularly problematic in serverless environments where you have limited execution time, and during shutdown() which awaits flush().
⏱️ Proposed fix: add a configurable timeout
+ private readonly requestTimeoutMs: number;
+
// In constructor:
+ this.requestTimeoutMs = config.requestTimeoutMs ?? 30_000;
private async sendRequest(url: string, body: unknown): Promise<void> {
+ const controller = new AbortController();
+ const timeout = setTimeout(() => controller.abort(), this.requestTimeoutMs);
+
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: this.authHeader,
'X-Scope-OrgID': this.instanceId,
},
body: JSON.stringify(body),
+ signal: controller.signal,
- });
+ }).finally(() => clearTimeout(timeout));
if (!response.ok) {
const responseText = await response.text().catch(() => '(no body)');
throw new Error(
`Grafana Cloud API error: ${response.status} ${response.statusText} - ${responseText}`,
);
}
}🤖 Prompt for AI Agents
In `@observability/grafana-cloud/src/exporter.ts` around lines 333 - 350,
sendRequest currently calls fetch without a timeout; add a configurable timeout
property to the exporter class (e.g., timeoutMs) and use an AbortController
inside sendRequest to abort the request after that timeout. Create the
AbortController, pass controller.signal to fetch, set a timer to call
controller.abort() after timeoutMs (and clear the timer on completion), and
catch the abort/timeout case to throw a clear timeout-specific Error before
rethrowing other errors; update any constructor or config parsing to accept
timeoutMs so it can be tuned.
| function buildLogLine(log: ExportedLog): string { | ||
| const entry: Record<string, unknown> = { | ||
| message: log.message, | ||
| }; | ||
|
|
||
| // Add trace correlation | ||
| if (log.traceId) entry.traceId = log.traceId; | ||
| if (log.spanId) entry.spanId = log.spanId; | ||
|
|
||
| // Add structured data | ||
| if (log.data && Object.keys(log.data).length > 0) { | ||
| entry.data = log.data; | ||
| } | ||
|
|
||
| // Add metadata fields (runId, sessionId, userId, etc.) | ||
| if (log.metadata && Object.keys(log.metadata).length > 0) { | ||
| for (const [k, v] of Object.entries(log.metadata)) { | ||
| if (v !== undefined && v !== null) { | ||
| entry[k] = v; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return JSON.stringify(entry); | ||
| } |
There was a problem hiding this comment.
Metadata keys can silently overwrite reserved log-line fields.
buildLogLine spreads log.metadata entries directly into entry (lines 54-58). If metadata contains a key like message, traceId, spanId, or data, it will overwrite the structured fields set earlier, corrupting the log line.
Consider namespacing metadata or skipping reserved keys:
Option: skip reserved keys
+ const RESERVED_KEYS = new Set(['message', 'traceId', 'spanId', 'data']);
+
// Add metadata fields (runId, sessionId, userId, etc.)
if (log.metadata && Object.keys(log.metadata).length > 0) {
for (const [k, v] of Object.entries(log.metadata)) {
- if (v !== undefined && v !== null) {
+ if (v !== undefined && v !== null && !RESERVED_KEYS.has(k)) {
entry[k] = v;
}
}
}🤖 Prompt for AI Agents
In `@observability/grafana-cloud/src/formatters/logs.ts` around lines 38 - 62, In
buildLogLine, metadata entries can overwrite reserved fields (message, traceId,
spanId, data) because the code copies log.metadata into entry; update
buildLogLine to either skip reserved keys or namespace metadata keys before
merging (e.g., prefix keys with metadata. or put under a metadata field) by
checking each metadata key against the reserved set {message, traceId, spanId,
data} and only adding non-reserved keys (or adding them under entry.metadata) so
existing structured fields set earlier are never overwritten.
…lf-hosted Rename the package from @mastra/grafana-cloud to @mastra/grafana since the exporter works with both Grafana Cloud and self-hosted Grafana stack (Tempo, Mimir, Loki) — the wire protocols are identical. Key changes: - Rename GrafanaCloudExporter → GrafanaExporter - Add GrafanaAuth union type (basic, bearer, custom, none) - Add grafanaCloud() config helper for Cloud setup (zone-based endpoints, Basic auth with instanceId:apiKey) - Add grafana() config helper for self-hosted setup (direct endpoints, flexible auth) - Per-signal endpoint gating: skip signals with no endpoint configured - X-Scope-OrgID header only sent when tenantId is set - Export GrafanaCloudConfig, GrafanaSelfHostedConfig, GrafanaAuth types - Add README.md with usage examples for both deployment modes 74 passing tests (up from 59). https://claude.ai/code/session_01V4TiknqfJt5SpaQtxQYpjD
Description
This PR introduces a comprehensive observability infrastructure for Mastra with a unified type system and a production-ready Grafana Cloud exporter.
Core Changes
Observability Type System (
packages/core/src/observability/types/)core.ts: Top-level observability infrastructure types includingObservabilityContext(unified interface for tracing, logging, and metrics),ObservabilityEventBus,ObservabilityInstance, and configuration typeslogging.ts:LoggerContextinterface for structured logging with trace correlationmetrics.ts:MetricsContextinterface supporting counters, gauges, and histogramsscores.tsandfeedback.ts: Types for post-hoc span/trace annotation with scores and feedbacktracing.ts: IntroducedSpanDatabase type,RecordedSpanfor persisted spans with annotation methods, andTracefor trace-level operationsContext Factory (
packages/core/src/observability/context-factory.ts)createObservabilityContext(): Creates observability contexts with optional tracing, logging, and metricsresolveObservabilityContext(): Extracts observability context from partial objects, enabling flexible parameter passingGrafana Cloud Exporter (
observability/grafana-cloud/)@mastra/grafana-cloudexporting traces to Tempo, metrics to Mimir, and logs to Loki via OTLP/HTTP JSONGrafanaCloudExporter: Implements batching, configurable flush intervals, and Basic auth withinstanceId:apiKeyformatSpansForTempo(): Converts spans to OTLP trace formatformatMetricsForMimir(): Converts metrics to OTLP metrics formatformatLogsForLoki(): Converts logs to Loki JSON push API formatIntegration Updates
Updated core modules to use the new
ObservabilityContextinterface:Agent,AgentLegacy: UseresolveObservabilityContext()to extract observability from parametersWorkflow,WorkflowEvented: Pass observability context through step executionProcessorRunner: Derives logger and metrics from current spanObservabilityContextinstead of justTracingContextBackward Compatibility
TracingContexttype remains unchangedObservabilityContextprovidestracingContextalias for compatibilityType of Change
Checklist
https://claude.ai/code/session_01V4TiknqfJt5SpaQtxQYpjD
Summary by CodeRabbit
New Features
loggerVNextandmetricsgetters to Mastra class for non-traced logging and metrics collection.Refactor