From 7f9be47039cb661052a923cad811cbf4c209f728 Mon Sep 17 00:00:00 2001 From: Alex Shlyakhov Date: Tue, 26 May 2026 13:29:52 +0500 Subject: [PATCH] first commit --- .gitignore | 3 + .idea/.gitignore | 10 + .idea/copilot.data.migration.ask2agent.xml | 6 + .idea/go.imports.xml | 11 + .idea/modules.xml | 8 + .idea/php.xml | 19 ++ .idea/rb-search.iml | 9 + .idea/vcs.xml | 6 + README.md | 241 ++++++++++++++++ config.go | 152 ++++++++++ go.mod | 5 + go.sum | 4 + main.go | 210 ++++++++++++++ rabbit.go | 314 +++++++++++++++++++++ 14 files changed, 998 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/copilot.data.migration.ask2agent.xml create mode 100644 .idea/go.imports.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/php.xml create mode 100644 .idea/rb-search.iml create mode 100644 .idea/vcs.xml create mode 100644 README.md create mode 100644 config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 rabbit.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dc5a314 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/rb-search +*.test +*.out diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..30cf57e --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,10 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Ignored default folder with query files +/queries/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/copilot.data.migration.ask2agent.xml b/.idea/copilot.data.migration.ask2agent.xml new file mode 100644 index 0000000..1f2ea11 --- /dev/null +++ b/.idea/copilot.data.migration.ask2agent.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/go.imports.xml b/.idea/go.imports.xml new file mode 100644 index 0000000..d7202f0 --- /dev/null +++ b/.idea/go.imports.xml @@ -0,0 +1,11 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..34ca45b --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/php.xml b/.idea/php.xml new file mode 100644 index 0000000..f324872 --- /dev/null +++ b/.idea/php.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/rb-search.iml b/.idea/rb-search.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/rb-search.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..20d26f5 --- /dev/null +++ b/README.md @@ -0,0 +1,241 @@ +# rb-search + +Консольная утилита для поиска по сообщениям в очередях RabbitMQ **без их потребления**. + +## Что делает + +1. Считывает глубину очереди `N` через `queue.declare-passive`. +2. Делает `N` раз `basic.get` с `autoAck=false` — сообщения становятся `unacked`, + для других консьюмеров очередь выглядит пустой, но физически они остаются на брокере. +3. В памяти проверяет каждое тело на подстроку и печатает совпадения (или все + сообщения, если фильтр не задан). +4. Делает `basic.nack(requeue=true)` по delivery-тегам **в обратном порядке** — + каждое `nack` возвращает сообщение в голову очереди, и после прохода + очередь восстанавливается в исходном виде. Без дублей, без потери + `properties` / `headers` (сообщения ни разу не публикуются заново — это те же + фреймы с другим ack-состоянием). + +На любой ветке выхода (ошибка, Ctrl+C, паника) defer гарантирует reverse-nack +до закрытия канала. Если по какой-то причине nack не успел — закрытие канала +автоматически возвращает unacked-сообщения, но порядок в этом случае не +гарантируется. + +## Установка + +Нужен Go 1.22+. Бинарь самодостаточный, без внешних зависимостей в рантайме. + +### Из исходников (любая ОС) + +```bash +git clone ssh://git@git.arx.ru:2222/shuricken/rb-search.git +cd rb-search +go build -o rb-search . +``` + +Готовый бинарь — `./rb-search`. Положить на `PATH`: + +- Linux/macOS: `install -m 0755 rb-search ~/.local/bin/` или `sudo mv rb-search /usr/local/bin/` +- Windows (PowerShell): `Move-Item rb-search.exe $env:USERPROFILE\bin\` + +### Через `go install` + +```bash +go install git.arx.ru/shuricken/rb-search@latest +``` + +Бинарь окажется в `$(go env GOBIN)` (по умолчанию `~/go/bin`). Добавить в `PATH`: + +- bash/zsh: `export PATH="$PATH:$(go env GOPATH)/bin"` в `~/.bashrc` / `~/.zshrc` +- fish: `fish_add_path (go env GOPATH)/bin` +- Windows: `setx PATH "%PATH%;%USERPROFILE%\go\bin"` (новые терминалы) + +### Arch Linux + +```bash +sudo pacman -S go +go install git.arx.ru/shuricken/rb-search@latest +``` + +### Debian / Ubuntu + +```bash +sudo apt install golang-go +go install git.arx.ru/shuricken/rb-search@latest +``` + +Если в репозитории старый Go (< 1.22) — собрать из исходников после установки +`go` через `snap` или `gvm`. + +### macOS + +```bash +brew install go +go install git.arx.ru/shuricken/rb-search@latest +``` + +### Windows + +1. Поставить Go с https://go.dev/dl/. +2. В PowerShell: `go install git.arx.ru/shuricken/rb-search@latest`. +3. Бинарь окажется в `%USERPROFILE%\go\bin\rb-search.exe`. + +### Кросс-компиляция + +С Linux собрать под другую платформу: + +```bash +GOOS=darwin GOARCH=arm64 go build -o rb-search-darwin-arm64 . +GOOS=windows GOARCH=amd64 go build -o rb-search.exe . +``` + +## Конфигурация + +Два независимых источника (приоритет — конфиг-файл, если он есть). + +### JSON-конфиг + +Путь поиска: `--config` → `$RB_SEARCH_CONFIG` → `$XDG_CONFIG_HOME/rb-search/config.json` +→ `~/.config/rb-search/config.json`. + +Создать шаблон: + +```bash +rb-search init +``` + +Формат: + +```json +{ + "default": "prod", + "servers": { + "local": { + "url": "amqp://guest:guest@localhost:5672/" + }, + "prod": { + "url": "amqp://admin:secret@rabbit.prod:5672/myvhost" + }, + "arx": { + "url": "amqp://admin:REDACTED@10.49.150.156:5672/" + } + } +} +``` + +Управление: + +```bash +rb-search list # показать серверы, default помечен * +rb-search set-default prod # сменить default +``` + +### Переменные окружения + +Если конфиг-файла нет и заданы переменные — синтезируется сервер `env`, он же +default: + +```bash +export EVENTBUS_AMQP_HOST=10.49.150.156 # обязательная +export EVENTBUS_AMQP_USER=admin +export EVENTBUS_AMQP_PASS=REDACTED +export EVENTBUS_AMQP_PORT=5672 # опционально, по умолчанию 5672 +export EVENTBUS_AMQP_VHOST= # опционально, по умолчанию / +rb-search --queue event +``` + +Существующий конфиг полностью имеет приоритет — env в этом случае игнорируется, +чтобы поведение было однозначным. + +## Использование + +```text +rb-search [flags] # читать default-сервер +rb-search --server NAME --queue Q # выбрать сервер вручную +rb-search list # список серверов +rb-search set-default NAME # сменить default +rb-search init # создать sample-конфиг +``` + +Флаги: + +| Флаг | Назначение | +|----------------|-----------------------------------------------------------------------------------------------------| +| `--config PATH`| Путь к конфигу. | +| `--server NAME`| Имя сервера из конфига (override default). | +| `--queue NAME` | Очередь для сканирования (обязательный). | +| `--search S` | Подстрока для поиска в теле. Пусто — печатать все. | +| `--full` | Печатать полный конверт (exchange, routing-key, headers, properties). По умолчанию — только тело. | +| `--json` | Машинный вывод. С `--full` — объект-конверт; без `--full` — стрим тел. | +| `--max-body N` | Обрезать тело до N байт при печати (`0` — без ограничения). | +| `--timeout D` | Таймаут подключения, например `15s`. По умолчанию `30s`. | + +### Матрица режимов вывода + +| Флаги | stdout | +|-------------------|---------------------------------------------------------------------| +| `(default)` | Тело, pretty-print если JSON, маркер `--- #N tag=… ---` в stderr. | +| `--full` | Конверт + тело (тело pretty-print если JSON), человекочитаемо. | +| `--json` | Стрим тел: компактный JSON (если тело — JSON) или JSON-строка. | +| `--json --full` | Один JSON-объект на строку с полным конвертом; `body` — вложенный JSON, если разбирается. | + +Диагностика (`queue "…" depth=N`, `scanned=N matched=M`, маркеры сообщений) +идёт в **stderr**, чтобы stdout оставался парсимым `jq`/файлом. + +### Примеры + +```bash +# Все сообщения как pretty JSON +rb-search --queue event + +# Поиск с подсветкой (ANSI inverse video) +rb-search --queue event --search "OLV-3643" + +# Полный конверт в человекочитаемом виде +rb-search --queue event --full + +# Стрим тел через jq +rb-search --queue event --json | jq + +# Собрать тела в массив +rb-search --queue event --json | jq -s . + +# Извлечь поле из каждого тела +rb-search --queue event --json --search change | jq '.offerAcceptance.id' + +# Полные сообщения с разобранным body +rb-search --queue event --json --full | jq '.body.action' + +# Только тела в файл +rb-search --queue event > bodies.txt # маркеры остались в терминале (stderr) + +# Без болтовни +rb-search --queue event --json 2>/dev/null | jq + +# Другой сервер из конфига +rb-search --server prod --queue billing.events --search "invoice-42" + +# С env-переменными, без конфиг-файла +EVENTBUS_AMQP_HOST=10.49.150.156 EVENTBUS_AMQP_USER=admin EVENTBUS_AMQP_PASS=REDACTED \ + rb-search --queue event +``` + +### Коды выхода + +- `0` — успех (включая отсутствие совпадений). +- `1` — ошибка рантайма (нет коннекта, не существует очередь, ошибка AMQP, …). +- `2` — неверное использование флагов. + +## Ограничения + +- Очередь должна существовать. `queue.declare-passive` сначала пробуется как + `durable`, при mismatch — переоткрывается канал и пробуется как + non-durable. Декларация с `auto-delete=true` / `exclusive=true` может не + совпасть — это редко мешает на практике. +- `basic.get` читает только то, что есть в очереди на момент старта. Сообщения, + пришедшие во время сканирования, будут увидены при следующем запуске. +- Поиск — побайтовый `Contains` по сырому телу. Если тело сжато + (`content-encoding: gzip`), искать нужно по сжатому представлению или + разжимать руками. +- Параллельно работающие консьюмеры на этой же очереди в момент сканирования + будут видеть очередь как «пустую» — это нормально, после reverse-nack + обычная работа возобновится. diff --git a/config.go b/config.go new file mode 100644 index 0000000..b87d7ca --- /dev/null +++ b/config.go @@ -0,0 +1,152 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" +) + +type ServerConfig struct { + URL string `json:"url"` +} + +type Config struct { + Default string `json:"default"` + Servers map[string]ServerConfig `json:"servers"` + + path string + fromEnv bool +} + +const envServerName = "env" + +func defaultConfigPath() string { + if p := os.Getenv("RB_SEARCH_CONFIG"); p != "" { + return p + } + if x := os.Getenv("XDG_CONFIG_HOME"); x != "" { + return filepath.Join(x, "rb-search", "config.json") + } + if h, err := os.UserHomeDir(); err == nil { + return filepath.Join(h, ".config", "rb-search", "config.json") + } + return "config.json" +} + +func loadConfig(path string) (*Config, error) { + explicit := path != "" + if path == "" { + path = defaultConfigPath() + } + data, err := os.ReadFile(path) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + if envCfg := configFromEnv(); envCfg != nil { + envCfg.path = path + return envCfg, nil + } + hint := "EVENTBUS_AMQP_HOST/USER/PASS env vars not set either" + if explicit { + return nil, fmt.Errorf("config not found at %s (%s)", path, hint) + } + return nil, fmt.Errorf("config not found at %s (%s; run `rb-search init` to create one)", path, hint) + } + var c Config + if err := json.Unmarshal(data, &c); err != nil { + return nil, fmt.Errorf("parse %s: %w", path, err) + } + if c.Servers == nil { + c.Servers = map[string]ServerConfig{} + } + c.path = path + return &c, nil +} + +// configFromEnv builds a synthetic in-memory config from EVENTBUS_AMQP_*. +// Returns nil when EVENTBUS_AMQP_HOST is unset — host is the only required field. +func configFromEnv() *Config { + host := strings.TrimSpace(os.Getenv("EVENTBUS_AMQP_HOST")) + if host == "" { + return nil + } + port := os.Getenv("EVENTBUS_AMQP_PORT") + if port == "" { + port = "5672" + } + vhost := os.Getenv("EVENTBUS_AMQP_VHOST") + user := os.Getenv("EVENTBUS_AMQP_USER") + pass := os.Getenv("EVENTBUS_AMQP_PASS") + + u := &url.URL{ + Scheme: "amqp", + Host: host + ":" + port, + Path: "/" + strings.TrimPrefix(vhost, "/"), + } + if user != "" { + u.User = url.UserPassword(user, pass) + } + return &Config{ + Default: envServerName, + Servers: map[string]ServerConfig{ + envServerName: {URL: u.String()}, + }, + fromEnv: true, + } +} + +func (c *Config) save() error { + if c.fromEnv { + return fmt.Errorf("config was synthesized from EVENTBUS_AMQP_* env vars; run `rb-search init` to materialize a file first") + } + if err := os.MkdirAll(filepath.Dir(c.path), 0o755); err != nil { + return err + } + data, err := json.MarshalIndent(c, "", " ") + if err != nil { + return err + } + tmp := c.path + ".tmp" + if err := os.WriteFile(tmp, append(data, '\n'), 0o600); err != nil { + return err + } + return os.Rename(tmp, c.path) +} + +func (c *Config) resolveServer(name string) (string, ServerConfig, error) { + if name == "" { + name = c.Default + } + if name == "" { + return "", ServerConfig{}, fmt.Errorf("no server specified and no default set") + } + srv, ok := c.Servers[name] + if !ok { + return "", ServerConfig{}, fmt.Errorf("unknown server %q", name) + } + if srv.URL == "" { + return "", ServerConfig{}, fmt.Errorf("server %q has empty url", name) + } + return name, srv, nil +} + +func writeSampleConfig(path string) error { + if path == "" { + path = defaultConfigPath() + } + if _, err := os.Stat(path); err == nil { + return fmt.Errorf("%s already exists", path) + } + sample := Config{ + Default: "local", + Servers: map[string]ServerConfig{ + "local": {URL: "amqp://guest:guest@localhost:5672/"}, + }, + path: path, + } + return sample.save() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f144f95 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.arx.ru/shuricken/rb-search + +go 1.22 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/main.go b/main.go new file mode 100644 index 0000000..33c4a09 --- /dev/null +++ b/main.go @@ -0,0 +1,210 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "sort" + "syscall" + "time" +) + +const usage = `rb-search — peek into a RabbitMQ queue without consuming. + +Reads N=queue-depth messages with basic.get + manual ack, runs a substring +search in memory, prints matches, then requeues everything in reverse order +so the queue is restored byte-for-byte. Properties and headers are preserved +because messages are never republished. + +Usage: + rb-search [flags] # scan default server's queue + rb-search --server prod --queue q # scan a specific server + rb-search list # list configured servers + rb-search set-default # mark a server as default + rb-search init # write a sample config + +Config: + Loaded from --config or $RB_SEARCH_CONFIG or + $XDG_CONFIG_HOME/rb-search/config.json (default ~/.config/rb-search/config.json). + + If no config file exists, rb-search falls back to env vars: + EVENTBUS_AMQP_HOST (required to enable the fallback) + EVENTBUS_AMQP_USER, EVENTBUS_AMQP_PASS + EVENTBUS_AMQP_PORT (default 5672) + EVENTBUS_AMQP_VHOST (default /) + These materialize a synthetic server named "env" that becomes the default. + +Flags: + --config PATH Config file path. + --server NAME Server alias (overrides 'default'). + --queue NAME Queue to scan. Required for the scan command. + --search TEXT Substring to match against message body. Empty -> print all. + --full Include envelope (exchange, routing-key, headers, props). + Default prints only the body, pretty-printed if it is JSON. + --max-body N Truncate body to N bytes when printing (0 = full). Default 0. + --timeout DUR AMQP dial timeout. Default 30s. + --json Machine-readable output. Combine with --full to emit a + full envelope object per message; without --full, emits + one body per line (compact JSON if the body is JSON, + otherwise the body as a JSON string). + +Exit status: + 0 success (any number of matches) + 1 runtime error + 2 bad usage +` + +func main() { + if err := run(os.Args[1:]); err != nil { + fmt.Fprintln(os.Stderr, "error:", err) + os.Exit(1) + } +} + +func run(args []string) error { + if len(args) > 0 { + switch args[0] { + case "list", "ls": + return cmdList(args[1:]) + case "set-default": + return cmdSetDefault(args[1:]) + case "init": + return cmdInit(args[1:]) + case "help", "-h", "--help": + fmt.Print(usage) + return nil + } + } + return cmdScan(args) +} + +func newFlagSet(name string) *flag.FlagSet { + fs := flag.NewFlagSet(name, flag.ContinueOnError) + fs.SetOutput(os.Stderr) + fs.Usage = func() { + fmt.Fprint(os.Stderr, usage) + } + return fs +} + +func cmdScan(args []string) error { + fs := newFlagSet("scan") + configPath := fs.String("config", "", "config file path") + server := fs.String("server", "", "server alias") + queue := fs.String("queue", "", "queue name") + search := fs.String("search", "", "substring to match in body") + maxBody := fs.Int("max-body", 0, "truncate body to N bytes (0 = full)") + timeout := fs.Duration("timeout", 30*time.Second, "dial timeout") + jsonOut := fs.Bool("json", false, "machine-readable JSON output (body only; combine with --full for envelope)") + full := fs.Bool("full", false, "print full envelope (exchange, headers, props) instead of body only") + if err := fs.Parse(args); err != nil { + return err + } + if *queue == "" { + fs.Usage() + return fmt.Errorf("--queue is required") + } + + cfg, err := loadConfig(*configPath) + if err != nil { + return err + } + name, srv, err := cfg.resolveServer(*server) + if err != nil { + return err + } + origin := "config" + if cfg.fromEnv { + origin = "env" + } + fmt.Fprintf(os.Stderr, "[rb-search] server=%s (%s) queue=%s search=%q\n", name, origin, *queue, *search) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + return scanQueue(ctx, srv.URL, scanOptions{ + queue: *queue, + search: *search, + maxBody: *maxBody, + timeout: *timeout, + out: os.Stdout, + diag: os.Stderr, + jsonOut: *jsonOut, + full: *full, + }) +} + +func cmdList(args []string) error { + fs := newFlagSet("list") + configPath := fs.String("config", "", "config file path") + if err := fs.Parse(args); err != nil { + return err + } + cfg, err := loadConfig(*configPath) + if err != nil { + return err + } + names := make([]string, 0, len(cfg.Servers)) + for n := range cfg.Servers { + names = append(names, n) + } + sort.Strings(names) + for _, n := range names { + marker := " " + if n == cfg.Default { + marker = "*" + } + fmt.Printf("%s %-15s %s\n", marker, n, cfg.Servers[n].URL) + } + if len(names) == 0 { + fmt.Println("(no servers configured)") + } + if cfg.fromEnv { + fmt.Fprintln(os.Stderr, "(synthetic config from EVENTBUS_AMQP_* env vars; run `rb-search init` to persist)") + } + return nil +} + +func cmdSetDefault(args []string) error { + fs := newFlagSet("set-default") + configPath := fs.String("config", "", "config file path") + if err := fs.Parse(args); err != nil { + return err + } + if fs.NArg() != 1 { + return fmt.Errorf("usage: rb-search set-default ") + } + name := fs.Arg(0) + cfg, err := loadConfig(*configPath) + if err != nil { + return err + } + if _, ok := cfg.Servers[name]; !ok { + return fmt.Errorf("unknown server %q", name) + } + cfg.Default = name + if err := cfg.save(); err != nil { + return err + } + fmt.Printf("default set to %q in %s\n", name, cfg.path) + return nil +} + +func cmdInit(args []string) error { + fs := newFlagSet("init") + configPath := fs.String("config", "", "config file path") + if err := fs.Parse(args); err != nil { + return err + } + path := *configPath + if path == "" { + path = defaultConfigPath() + } + if err := writeSampleConfig(path); err != nil { + return err + } + fmt.Printf("wrote sample config to %s\n", path) + return nil +} diff --git a/rabbit.go b/rabbit.go new file mode 100644 index 0000000..0d8bdb3 --- /dev/null +++ b/rabbit.go @@ -0,0 +1,314 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type capturedMsg struct { + idx int + delivery amqp.Delivery +} + +type scanOptions struct { + queue string + search string + maxBody int + timeout time.Duration + out io.Writer // message payloads + diag io.Writer // queue stats + scan summary; kept off stdout so --json pipes cleanly + jsonOut bool + full bool // include envelope (exchange, routing-key, headers, ...) in human output +} + +func scanQueue(ctx context.Context, url string, opts scanOptions) error { + if opts.out == nil { + opts.out = io.Discard + } + if opts.diag == nil { + opts.diag = io.Discard + } + if opts.timeout == 0 { + opts.timeout = 30 * time.Second + } + + conn, err := amqp.DialConfig(url, amqp.Config{ + Heartbeat: 10 * time.Second, + Locale: "en_US", + Dial: amqp.DefaultDial(opts.timeout), + }) + if err != nil { + return fmt.Errorf("connect: %w", err) + } + defer conn.Close() + + ch, q, err := inspectQueue(conn, opts.queue) + if err != nil { + return err + } + defer ch.Close() + + depth := q.Messages + fmt.Fprintf(opts.diag, "queue %q depth=%d consumers=%d\n", q.Name, depth, q.Consumers) + if depth == 0 { + return nil + } + + captured := make([]capturedMsg, 0, depth) + + // Failsafe: requeue in reverse order on any exit path. Channel close also + // auto-requeues but explicit reverse-order nacks preserve queue ordering. + defer func() { + for i := len(captured) - 1; i >= 0; i-- { + _ = ch.Nack(captured[i].delivery.DeliveryTag, false, true) + } + }() + + for i := 0; i < depth; i++ { + if err := ctx.Err(); err != nil { + return err + } + msg, ok, err := ch.Get(opts.queue, false) + if err != nil { + return fmt.Errorf("basic.get #%d: %w", i, err) + } + if !ok { + break + } + captured = append(captured, capturedMsg{idx: i, delivery: msg}) + } + + needle := []byte(opts.search) + matches := 0 + for _, c := range captured { + if opts.search != "" && !bytes.Contains(c.delivery.Body, needle) { + continue + } + matches++ + switch { + case opts.jsonOut && opts.full: + printJSONFull(opts.out, c.idx, c.delivery) + case opts.jsonOut: + printJSONBody(opts.out, c.delivery) + case opts.full: + printFull(opts.out, c.idx, c.delivery, opts.maxBody, opts.search) + default: + printBodyOnly(opts.out, opts.diag, c.idx, c.delivery, opts.maxBody, opts.search) + } + } + + if opts.search != "" { + fmt.Fprintf(opts.diag, "scanned=%d matched=%d\n", len(captured), matches) + } else { + fmt.Fprintf(opts.diag, "scanned=%d\n", len(captured)) + } + return nil +} + +func inspectQueue(conn *amqp.Connection, name string) (*amqp.Channel, amqp.Queue, error) { + ch, err := conn.Channel() + if err != nil { + return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err) + } + if q, err := ch.QueueDeclarePassive(name, true, false, false, false, nil); err == nil { + return ch, q, nil + } + ch.Close() + ch, err = conn.Channel() + if err != nil { + return nil, amqp.Queue{}, fmt.Errorf("channel: %w", err) + } + q, err := ch.QueueDeclarePassive(name, false, false, false, false, nil) + if err != nil { + ch.Close() + return nil, amqp.Queue{}, fmt.Errorf("queue inspect: %w", err) + } + return ch, q, nil +} + +// formatBody pretty-prints JSON bodies and decodes Unicode escapes; non-JSON +// bodies are returned as-is. +func formatBody(body []byte) []byte { + trimmed := bytes.TrimSpace(body) + if len(trimmed) == 0 || (trimmed[0] != '{' && trimmed[0] != '[') { + return body + } + var v any + if err := json.Unmarshal(trimmed, &v); err != nil { + return body + } + pretty, err := json.MarshalIndent(v, "", " ") + if err != nil { + return body + } + return pretty +} + +func truncate(b []byte, max int) (out []byte, truncated bool) { + if max <= 0 || len(b) <= max { + return b, false + } + return b[:max], true +} + +// printBodyOnly writes the message body to `w` (stdout-bound) and a per-message +// marker to `diag` (stderr-bound) so `rb-search ... | jq` sees a clean stream +// of JSON bodies while a human still sees which delivery is which. +func printBodyOnly(w io.Writer, diag io.Writer, idx int, d amqp.Delivery, maxBody int, search string) { + fmt.Fprintf(diag, "--- #%d tag=%d rk=%s ---\n", idx, d.DeliveryTag, d.RoutingKey) + body := formatBody(d.Body) + body, truncated := truncate(body, maxBody) + w.Write(highlight(body, search)) + if truncated { + fmt.Fprintf(w, "\n... (truncated, use --max-body=0 for full)") + } + if len(body) == 0 || body[len(body)-1] != '\n' { + fmt.Fprintln(w) + } +} + +func printFull(w io.Writer, idx int, d amqp.Delivery, maxBody int, search string) { + fmt.Fprintf(w, "=== #%d tag=%d exchange=%q routing-key=%q\n", idx, d.DeliveryTag, d.Exchange, d.RoutingKey) + if d.ContentType != "" || d.ContentEncoding != "" { + fmt.Fprintf(w, " content-type=%s content-encoding=%s\n", d.ContentType, d.ContentEncoding) + } + if d.MessageId != "" || d.CorrelationId != "" { + fmt.Fprintf(w, " message-id=%s correlation-id=%s\n", d.MessageId, d.CorrelationId) + } + if !d.Timestamp.IsZero() { + fmt.Fprintf(w, " timestamp=%s\n", d.Timestamp.Format(time.RFC3339)) + } + if len(d.Headers) > 0 { + hb, _ := json.Marshal(headersToMap(d.Headers)) + fmt.Fprintf(w, " headers=%s\n", hb) + } + + body := formatBody(d.Body) + body, truncated := truncate(body, maxBody) + fmt.Fprintf(w, " body(%d bytes):\n", len(d.Body)) + w.Write(indent(highlight(body, search), " ")) + if truncated { + fmt.Fprintf(w, "\n ... (truncated, use --max-body=0 for full)") + } + fmt.Fprintln(w) +} + +// printJSONBody emits just the message body as a single JSON value per line: +// if the body is already valid JSON, it is passed through compactly; otherwise +// it is encoded as a JSON string so the stdout stream stays parseable. +func printJSONBody(w io.Writer, d amqp.Delivery) { + trimmed := bytes.TrimSpace(d.Body) + if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') && json.Valid(trimmed) { + var compact bytes.Buffer + if err := json.Compact(&compact, trimmed); err == nil { + compact.WriteByte('\n') + w.Write(compact.Bytes()) + return + } + } + _ = json.NewEncoder(w).Encode(string(d.Body)) +} + +func printJSONFull(w io.Writer, idx int, d amqp.Delivery) { + row := map[string]any{ + "idx": idx, + "delivery_tag": d.DeliveryTag, + "exchange": d.Exchange, + "routing_key": d.RoutingKey, + "content_type": d.ContentType, + "content_encoding": d.ContentEncoding, + "message_id": d.MessageId, + "correlation_id": d.CorrelationId, + "headers": headersToMap(d.Headers), + "body": bodyAsJSONValue(d.Body), + } + if !d.Timestamp.IsZero() { + row["timestamp"] = d.Timestamp.Format(time.RFC3339) + } + _ = json.NewEncoder(w).Encode(row) +} + +// bodyAsJSONValue returns the body parsed as a JSON object/array when possible +// so `--json --full` consumers can drill into it with `jq .body.foo` without a +// separate `fromjson`. Falls back to the body as a string for non-JSON payloads. +func bodyAsJSONValue(body []byte) any { + trimmed := bytes.TrimSpace(body) + if len(trimmed) > 0 && (trimmed[0] == '{' || trimmed[0] == '[') { + var v any + if err := json.Unmarshal(trimmed, &v); err == nil { + return v + } + } + return string(body) +} + +func headersToMap(h amqp.Table) map[string]any { + if h == nil { + return nil + } + out := make(map[string]any, len(h)) + for k, v := range h { + out[k] = normalizeAMQPValue(v) + } + return out +} + +func normalizeAMQPValue(v any) any { + switch x := v.(type) { + case []byte: + return string(x) + case amqp.Table: + return headersToMap(x) + case []any: + out := make([]any, len(x)) + for i, e := range x { + out[i] = normalizeAMQPValue(e) + } + return out + default: + return v + } +} + +func indent(b []byte, prefix string) []byte { + if len(b) == 0 { + return []byte(prefix) + } + var buf bytes.Buffer + buf.WriteString(prefix) + for i, line := range bytes.Split(b, []byte{'\n'}) { + if i > 0 { + buf.WriteByte('\n') + buf.WriteString(prefix) + } + buf.Write(line) + } + return buf.Bytes() +} + +func highlight(body []byte, search string) []byte { + if search == "" { + return body + } + if !bytes.Contains(body, []byte(search)) { + return body + } + const on, off = "\x1b[7m", "\x1b[0m" + parts := bytes.Split(body, []byte(search)) + var buf bytes.Buffer + for i, p := range parts { + buf.Write(p) + if i < len(parts)-1 { + buf.WriteString(on) + buf.WriteString(search) + buf.WriteString(off) + } + } + return buf.Bytes() +}