package main import ( "bytes" "context" "encoding/json" "fmt" "io" "time" amqp "github.com/rabbitmq/amqp091-go" ) type capturedMsg struct { idx int delivery amqp.Delivery } type scanOptions struct { queue string search string maxBody int timeout time.Duration out io.Writer // message payloads diag io.Writer // queue stats + scan summary; kept off stdout so --json pipes cleanly jsonOut bool full bool // include envelope (exchange, routing-key, headers, ...) in human output } func scanQueue(ctx context.Context, url string, opts scanOptions) error { if opts.out == nil { opts.out = io.Discard } if opts.diag == nil { opts.diag = io.Discard } if opts.timeout == 0 { opts.timeout = 30 * time.Second } conn, err := amqp.DialConfig(url, amqp.Config{ Heartbeat: 10 * time.Second, Locale: "en_US", Dial: amqp.DefaultDial(opts.timeout), }) if err != nil { return fmt.Errorf("connect: %w", err) } defer conn.Close() ch, q, err := inspectQueue(conn, opts.queue) if err != nil { return err } defer ch.Close() depth := q.Messages fmt.Fprintf(opts.diag, "queue %q depth=%d consumers=%d\n", q.Name, depth, q.Consumers) if depth == 0 { return nil } captured := make([]capturedMsg, 0, depth) // Failsafe: requeue in reverse order on any exit path. Channel close also // auto-requeues but explicit reverse-order nacks preserve queue ordering. defer func() { for i := len(captured) - 1; i >= 0; i-- { _ = ch.Nack(captured[i].delivery.DeliveryTag, false, true) } }() for i := 0; i < depth; i++ { if err := ctx.Err(); err != nil { return err } msg, ok, err := ch.Get(opts.queue, false) if err != nil { return fmt.Errorf("basic.get #%d: %w", i, err) } if !ok { break } captured = append(captured, capturedMsg{idx: i, delivery: msg}) } needle := []byte(opts.search) matches := 0 for _, c := range captured { if opts.search != "" && !bytes.Contains(c.delivery.Body, needle) { continue } matches++ switch { case opts.jsonOut && opts.full: printJSONFull(opts.out, c.idx, c.delivery) case opts.jsonOut: printJSONBody(opts.out, c.delivery) case opts.full: printFull(opts.out, c.idx, c.delivery, opts.maxBody, opts.search) default: printBodyOnly(opts.out, opts.diag, c.idx, c.delivery, opts.maxBody, opts.search) } } if opts.search != "" { fmt.Fprintf(opts.diag, "scanned=%d matched=%d\n", len(captured), matches) } else { fmt.Fprintf(opts.diag, "scanned=%d\n", len(captured)) } return nil } func inspectQueue(conn *amqp.Connection, name string) (*amqp.Channel, amqp.Queue, error) { ch, err := conn.Channel() if err != nil { return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err) } if q, err := ch.QueueDeclarePassive(name, true, false, false, false, nil); err == nil { return ch, q, nil } ch.Close() ch, err = conn.Channel() if err != nil { return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err) } q, err := ch.QueueDeclarePassive(name, false, false, false, false, nil) if err != nil { ch.Close() return nil, amqp.Queue{}, fmt.Errorf("queue inspect: %w", err) } return ch, q, nil } // formatBody pretty-prints JSON bodies and decodes Unicode escapes; non-JSON // bodies are returned as-is. func formatBody(body []byte) []byte { trimmed := bytes.TrimSpace(body) if len(trimmed) == 0 || (trimmed[0] != '{' && trimmed[0] != '[') { return body } var v any if err := json.Unmarshal(trimmed, &v); err != nil { return body } pretty, err := json.MarshalIndent(v, "", " ") if err != nil { return body } return pretty } func truncate(b []byte, max int) (out []byte, truncated bool) { if max <= 0 || len(b) <= max { return b, false } return b[:max], true } // printBodyOnly writes the message body to `w` (stdout-bound) and a per-message // marker to `diag` (stderr-bound) so `rb-search ... | jq` sees a clean stream // of JSON bodies while a human still sees which delivery is which. func printBodyOnly(w io.Writer, diag io.Writer, idx int, d amqp.Delivery, maxBody int, search string) { fmt.Fprintf(diag, "--- #%d tag=%d rk=%s ---\n", idx, d.DeliveryTag, d.RoutingKey) body := formatBody(d.Body) body, truncated := truncate(body, maxBody) w.Write(highlight(body, search)) if truncated { fmt.Fprintf(w, "\n... (truncated, use --max-body=0 for full)") } if len(body) == 0 || body[len(body)-1] != '\n' { fmt.Fprintln(w) } } func printFull(w io.Writer, idx int, d amqp.Delivery, maxBody int, search string) { fmt.Fprintf(w, "=== #%d tag=%d exchange=%q routing-key=%q\n", idx, d.DeliveryTag, d.Exchange, d.RoutingKey) if d.ContentType != "" || d.ContentEncoding != "" { fmt.Fprintf(w, " content-type=%s content-encoding=%s\n", d.ContentType, d.ContentEncoding) } if d.MessageId != "" || d.CorrelationId != "" { fmt.Fprintf(w, " message-id=%s correlation-id=%s\n", d.MessageId, d.CorrelationId) } if !d.Timestamp.IsZero() { fmt.Fprintf(w, " timestamp=%s\n", d.Timestamp.Format(time.RFC3339)) } if len(d.Headers) > 0 { hb, _ := json.Marshal(headersToMap(d.Headers)) fmt.Fprintf(w, " headers=%s\n", hb) } body := formatBody(d.Body) body, truncated := truncate(body, maxBody) fmt.Fprintf(w, " body(%d bytes):\n", len(d.Body)) w.Write(indent(highlight(body, search), " ")) if truncated { fmt.Fprintf(w, "\n ... (truncated, use --max-body=0 for full)") } fmt.Fprintln(w) } // printJSONBody emits just the message body as a single JSON value per line: // if the body is already valid JSON, it is passed through compactly; otherwise // it is encoded as a JSON string so the stdout stream stays parseable. func printJSONBody(w io.Writer, d amqp.Delivery) { trimmed := bytes.TrimSpace(d.Body) if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') && json.Valid(trimmed) { var compact bytes.Buffer if err := json.Compact(&compact, trimmed); err == nil { compact.WriteByte('\n') w.Write(compact.Bytes()) return } } _ = json.NewEncoder(w).Encode(string(d.Body)) } func printJSONFull(w io.Writer, idx int, d amqp.Delivery) { row := map[string]any{ "idx": idx, "delivery_tag": d.DeliveryTag, "exchange": d.Exchange, "routing_key": d.RoutingKey, "content_type": d.ContentType, "content_encoding": d.ContentEncoding, "message_id": d.MessageId, "correlation_id": d.CorrelationId, "headers": headersToMap(d.Headers), "body": bodyAsJSONValue(d.Body), } if !d.Timestamp.IsZero() { row["timestamp"] = d.Timestamp.Format(time.RFC3339) } _ = json.NewEncoder(w).Encode(row) } // bodyAsJSONValue returns the body parsed as a JSON object/array when possible // so `--json --full` consumers can drill into it with `jq .body.foo` without a // separate `fromjson`. Falls back to the body as a string for non-JSON payloads. func bodyAsJSONValue(body []byte) any { trimmed := bytes.TrimSpace(body) if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') { var v any if err := json.Unmarshal(trimmed, &v); err == nil { return v } } return string(body) } func headersToMap(h amqp.Table) map[string]any { if h == nil { return nil } out := make(map[string]any, len(h)) for k, v := range h { out[k] = normalizeAMQPValue(v) } return out } func normalizeAMQPValue(v any) any { switch x := v.(type) { case []byte: return string(x) case amqp.Table: return headersToMap(x) case []any: out := make([]any, len(x)) for i, e := range x { out[i] = normalizeAMQPValue(e) } return out default: return v } } func indent(b []byte, prefix string) []byte { if len(b) == 0 { return []byte(prefix) } var buf bytes.Buffer buf.WriteString(prefix) for i, line := range bytes.Split(b, []byte{'\n'}) { if i > 0 { buf.WriteByte('\n') buf.WriteString(prefix) } buf.Write(line) } return buf.Bytes() } func highlight(body []byte, search string) []byte { if search == "" { return body } if !bytes.Contains(body, []byte(search)) { return body } const on, off = "\x1b[7m", "\x1b[0m" parts := bytes.Split(body, []byte(search)) var buf bytes.Buffer for i, p := range parts { buf.Write(p) if i < len(parts)-1 { buf.WriteString(on) buf.WriteString(search) buf.WriteString(off) } } return buf.Bytes() }