first commit
This commit is contained in:
314
rabbit.go
Normal file
314
rabbit.go
Normal file
@@ -0,0 +1,314 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type capturedMsg struct {
|
||||
idx int
|
||||
delivery amqp.Delivery
|
||||
}
|
||||
|
||||
type scanOptions struct {
|
||||
queue string
|
||||
search string
|
||||
maxBody int
|
||||
timeout time.Duration
|
||||
out io.Writer // message payloads
|
||||
diag io.Writer // queue stats + scan summary; kept off stdout so --json pipes cleanly
|
||||
jsonOut bool
|
||||
full bool // include envelope (exchange, routing-key, headers, ...) in human output
|
||||
}
|
||||
|
||||
func scanQueue(ctx context.Context, url string, opts scanOptions) error {
|
||||
if opts.out == nil {
|
||||
opts.out = io.Discard
|
||||
}
|
||||
if opts.diag == nil {
|
||||
opts.diag = io.Discard
|
||||
}
|
||||
if opts.timeout == 0 {
|
||||
opts.timeout = 30 * time.Second
|
||||
}
|
||||
|
||||
conn, err := amqp.DialConfig(url, amqp.Config{
|
||||
Heartbeat: 10 * time.Second,
|
||||
Locale: "en_US",
|
||||
Dial: amqp.DefaultDial(opts.timeout),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
ch, q, err := inspectQueue(conn, opts.queue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
depth := q.Messages
|
||||
fmt.Fprintf(opts.diag, "queue %q depth=%d consumers=%d\n", q.Name, depth, q.Consumers)
|
||||
if depth == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
captured := make([]capturedMsg, 0, depth)
|
||||
|
||||
// Failsafe: requeue in reverse order on any exit path. Channel close also
|
||||
// auto-requeues but explicit reverse-order nacks preserve queue ordering.
|
||||
defer func() {
|
||||
for i := len(captured) - 1; i >= 0; i-- {
|
||||
_ = ch.Nack(captured[i].delivery.DeliveryTag, false, true)
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < depth; i++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
msg, ok, err := ch.Get(opts.queue, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("basic.get #%d: %w", i, err)
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
captured = append(captured, capturedMsg{idx: i, delivery: msg})
|
||||
}
|
||||
|
||||
needle := []byte(opts.search)
|
||||
matches := 0
|
||||
for _, c := range captured {
|
||||
if opts.search != "" && !bytes.Contains(c.delivery.Body, needle) {
|
||||
continue
|
||||
}
|
||||
matches++
|
||||
switch {
|
||||
case opts.jsonOut && opts.full:
|
||||
printJSONFull(opts.out, c.idx, c.delivery)
|
||||
case opts.jsonOut:
|
||||
printJSONBody(opts.out, c.delivery)
|
||||
case opts.full:
|
||||
printFull(opts.out, c.idx, c.delivery, opts.maxBody, opts.search)
|
||||
default:
|
||||
printBodyOnly(opts.out, opts.diag, c.idx, c.delivery, opts.maxBody, opts.search)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.search != "" {
|
||||
fmt.Fprintf(opts.diag, "scanned=%d matched=%d\n", len(captured), matches)
|
||||
} else {
|
||||
fmt.Fprintf(opts.diag, "scanned=%d\n", len(captured))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func inspectQueue(conn *amqp.Connection, name string) (*amqp.Channel, amqp.Queue, error) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err)
|
||||
}
|
||||
if q, err := ch.QueueDeclarePassive(name, true, false, false, false, nil); err == nil {
|
||||
return ch, q, nil
|
||||
}
|
||||
ch.Close()
|
||||
ch, err = conn.Channel()
|
||||
if err != nil {
|
||||
return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err)
|
||||
}
|
||||
q, err := ch.QueueDeclarePassive(name, false, false, false, false, nil)
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
return nil, amqp.Queue{}, fmt.Errorf("queue inspect: %w", err)
|
||||
}
|
||||
return ch, q, nil
|
||||
}
|
||||
|
||||
// formatBody pretty-prints JSON bodies and decodes Unicode escapes; non-JSON
|
||||
// bodies are returned as-is.
|
||||
func formatBody(body []byte) []byte {
|
||||
trimmed := bytes.TrimSpace(body)
|
||||
if len(trimmed) == 0 || (trimmed[0] != '{' && trimmed[0] != '[') {
|
||||
return body
|
||||
}
|
||||
var v any
|
||||
if err := json.Unmarshal(trimmed, &v); err != nil {
|
||||
return body
|
||||
}
|
||||
pretty, err := json.MarshalIndent(v, "", " ")
|
||||
if err != nil {
|
||||
return body
|
||||
}
|
||||
return pretty
|
||||
}
|
||||
|
||||
func truncate(b []byte, max int) (out []byte, truncated bool) {
|
||||
if max <= 0 || len(b) <= max {
|
||||
return b, false
|
||||
}
|
||||
return b[:max], true
|
||||
}
|
||||
|
||||
// printBodyOnly writes the message body to `w` (stdout-bound) and a per-message
|
||||
// marker to `diag` (stderr-bound) so `rb-search ... | jq` sees a clean stream
|
||||
// of JSON bodies while a human still sees which delivery is which.
|
||||
func printBodyOnly(w io.Writer, diag io.Writer, idx int, d amqp.Delivery, maxBody int, search string) {
|
||||
fmt.Fprintf(diag, "--- #%d tag=%d rk=%s ---\n", idx, d.DeliveryTag, d.RoutingKey)
|
||||
body := formatBody(d.Body)
|
||||
body, truncated := truncate(body, maxBody)
|
||||
w.Write(highlight(body, search))
|
||||
if truncated {
|
||||
fmt.Fprintf(w, "\n... (truncated, use --max-body=0 for full)")
|
||||
}
|
||||
if len(body) == 0 || body[len(body)-1] != '\n' {
|
||||
fmt.Fprintln(w)
|
||||
}
|
||||
}
|
||||
|
||||
func printFull(w io.Writer, idx int, d amqp.Delivery, maxBody int, search string) {
|
||||
fmt.Fprintf(w, "=== #%d tag=%d exchange=%q routing-key=%q\n", idx, d.DeliveryTag, d.Exchange, d.RoutingKey)
|
||||
if d.ContentType != "" || d.ContentEncoding != "" {
|
||||
fmt.Fprintf(w, " content-type=%s content-encoding=%s\n", d.ContentType, d.ContentEncoding)
|
||||
}
|
||||
if d.MessageId != "" || d.CorrelationId != "" {
|
||||
fmt.Fprintf(w, " message-id=%s correlation-id=%s\n", d.MessageId, d.CorrelationId)
|
||||
}
|
||||
if !d.Timestamp.IsZero() {
|
||||
fmt.Fprintf(w, " timestamp=%s\n", d.Timestamp.Format(time.RFC3339))
|
||||
}
|
||||
if len(d.Headers) > 0 {
|
||||
hb, _ := json.Marshal(headersToMap(d.Headers))
|
||||
fmt.Fprintf(w, " headers=%s\n", hb)
|
||||
}
|
||||
|
||||
body := formatBody(d.Body)
|
||||
body, truncated := truncate(body, maxBody)
|
||||
fmt.Fprintf(w, " body(%d bytes):\n", len(d.Body))
|
||||
w.Write(indent(highlight(body, search), " "))
|
||||
if truncated {
|
||||
fmt.Fprintf(w, "\n ... (truncated, use --max-body=0 for full)")
|
||||
}
|
||||
fmt.Fprintln(w)
|
||||
}
|
||||
|
||||
// printJSONBody emits just the message body as a single JSON value per line:
|
||||
// if the body is already valid JSON, it is passed through compactly; otherwise
|
||||
// it is encoded as a JSON string so the stdout stream stays parseable.
|
||||
func printJSONBody(w io.Writer, d amqp.Delivery) {
|
||||
trimmed := bytes.TrimSpace(d.Body)
|
||||
if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') && json.Valid(trimmed) {
|
||||
var compact bytes.Buffer
|
||||
if err := json.Compact(&compact, trimmed); err == nil {
|
||||
compact.WriteByte('\n')
|
||||
w.Write(compact.Bytes())
|
||||
return
|
||||
}
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(string(d.Body))
|
||||
}
|
||||
|
||||
func printJSONFull(w io.Writer, idx int, d amqp.Delivery) {
|
||||
row := map[string]any{
|
||||
"idx": idx,
|
||||
"delivery_tag": d.DeliveryTag,
|
||||
"exchange": d.Exchange,
|
||||
"routing_key": d.RoutingKey,
|
||||
"content_type": d.ContentType,
|
||||
"content_encoding": d.ContentEncoding,
|
||||
"message_id": d.MessageId,
|
||||
"correlation_id": d.CorrelationId,
|
||||
"headers": headersToMap(d.Headers),
|
||||
"body": bodyAsJSONValue(d.Body),
|
||||
}
|
||||
if !d.Timestamp.IsZero() {
|
||||
row["timestamp"] = d.Timestamp.Format(time.RFC3339)
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(row)
|
||||
}
|
||||
|
||||
// bodyAsJSONValue returns the body parsed as a JSON object/array when possible
|
||||
// so `--json --full` consumers can drill into it with `jq .body.foo` without a
|
||||
// separate `fromjson`. Falls back to the body as a string for non-JSON payloads.
|
||||
func bodyAsJSONValue(body []byte) any {
|
||||
trimmed := bytes.TrimSpace(body)
|
||||
if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') {
|
||||
var v any
|
||||
if err := json.Unmarshal(trimmed, &v); err == nil {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return string(body)
|
||||
}
|
||||
|
||||
func headersToMap(h amqp.Table) map[string]any {
|
||||
if h == nil {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]any, len(h))
|
||||
for k, v := range h {
|
||||
out[k] = normalizeAMQPValue(v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func normalizeAMQPValue(v any) any {
|
||||
switch x := v.(type) {
|
||||
case []byte:
|
||||
return string(x)
|
||||
case amqp.Table:
|
||||
return headersToMap(x)
|
||||
case []any:
|
||||
out := make([]any, len(x))
|
||||
for i, e := range x {
|
||||
out[i] = normalizeAMQPValue(e)
|
||||
}
|
||||
return out
|
||||
default:
|
||||
return v
|
||||
}
|
||||
}
|
||||
|
||||
func indent(b []byte, prefix string) []byte {
|
||||
if len(b) == 0 {
|
||||
return []byte(prefix)
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
buf.WriteString(prefix)
|
||||
for i, line := range bytes.Split(b, []byte{'\n'}) {
|
||||
if i > 0 {
|
||||
buf.WriteByte('\n')
|
||||
buf.WriteString(prefix)
|
||||
}
|
||||
buf.Write(line)
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func highlight(body []byte, search string) []byte {
|
||||
if search == "" {
|
||||
return body
|
||||
}
|
||||
if !bytes.Contains(body, []byte(search)) {
|
||||
return body
|
||||
}
|
||||
const on, off = "\x1b[7m", "\x1b[0m"
|
||||
parts := bytes.Split(body, []byte(search))
|
||||
var buf bytes.Buffer
|
||||
for i, p := range parts {
|
||||
buf.Write(p)
|
||||
if i < len(parts)-1 {
|
||||
buf.WriteString(on)
|
||||
buf.WriteString(search)
|
||||
buf.WriteString(off)
|
||||
}
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
Reference in New Issue
Block a user