package main import ( "context" "flag" "fmt" "os" "os/signal" "sort" "syscall" "time" ) // version is overridden at build time via -ldflags '-X main.version=...'. var version = "dev" const usage = `rb-search — peek into a RabbitMQ queue without consuming. Reads N=queue-depth messages with basic.get + manual ack, runs a substring search in memory, prints matches, then requeues everything in reverse order so the queue is restored byte-for-byte. Properties and headers are preserved because messages are never republished. Usage: rb-search [flags] # scan default server's queue rb-search --server prod --queue q # scan a specific server rb-search list # list configured servers rb-search set-default # mark a server as default rb-search init # write a sample config Config: Loaded from --config or $RB_SEARCH_CONFIG or $XDG_CONFIG_HOME/rb-search/config.json (default ~/.config/rb-search/config.json). If no config file exists, rb-search falls back to env vars: EVENTBUS_AMQP_HOST (required to enable the fallback) EVENTBUS_AMQP_USER, EVENTBUS_AMQP_PASS EVENTBUS_AMQP_PORT (default 5672) EVENTBUS_AMQP_VHOST (default /) These materialize a synthetic server named "env" that becomes the default. Flags: --config PATH Config file path. --server NAME Server alias (overrides 'default'). --queue NAME Queue to scan. Required for the scan command. --search TEXT Substring to match against message body. Empty -> print all. --full Include envelope (exchange, routing-key, headers, props). Default prints only the body, pretty-printed if it is JSON. --max-body N Truncate body to N bytes when printing (0 = full). Default 0. --timeout DUR AMQP dial timeout. Default 30s. --json Machine-readable output. Combine with --full to emit a full envelope object per message; without --full, emits one body per line (compact JSON if the body is JSON, otherwise the body as a JSON string). Exit status: 0 success (any number of matches) 1 runtime error 2 bad usage ` func main() { if err := run(os.Args[1:]); err != nil { fmt.Fprintln(os.Stderr, "error:", err) os.Exit(1) } } func run(args []string) error { if len(args) > 0 { switch args[0] { case "list", "ls": return cmdList(args[1:]) case "set-default": return cmdSetDefault(args[1:]) case "init": return cmdInit(args[1:]) case "version", "--version", "-V": fmt.Println(version) return nil case "help", "-h", "--help": fmt.Print(usage) return nil } } return cmdScan(args) } func newFlagSet(name string) *flag.FlagSet { fs := flag.NewFlagSet(name, flag.ContinueOnError) fs.SetOutput(os.Stderr) fs.Usage = func() { fmt.Fprint(os.Stderr, usage) } return fs } func cmdScan(args []string) error { fs := newFlagSet("scan") configPath := fs.String("config", "", "config file path") server := fs.String("server", "", "server alias") queue := fs.String("queue", "", "queue name") search := fs.String("search", "", "substring to match in body") maxBody := fs.Int("max-body", 0, "truncate body to N bytes (0 = full)") timeout := fs.Duration("timeout", 30*time.Second, "dial timeout") jsonOut := fs.Bool("json", false, "machine-readable JSON output (body only; combine with --full for envelope)") full := fs.Bool("full", false, "print full envelope (exchange, headers, props) instead of body only") if err := fs.Parse(args); err != nil { return err } if *queue == "" { fs.Usage() return fmt.Errorf("--queue is required") } cfg, err := loadConfig(*configPath) if err != nil { return err } name, srv, err := cfg.resolveServer(*server) if err != nil { return err } origin := "config" if cfg.fromEnv { origin = "env" } fmt.Fprintf(os.Stderr, "[rb-search] server=%s (%s) queue=%s search=%q\n", name, origin, *queue, *search) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() return scanQueue(ctx, srv.URL, scanOptions{ queue: *queue, search: *search, maxBody: *maxBody, timeout: *timeout, out: os.Stdout, diag: os.Stderr, jsonOut: *jsonOut, full: *full, }) } func cmdList(args []string) error { fs := newFlagSet("list") configPath := fs.String("config", "", "config file path") if err := fs.Parse(args); err != nil { return err } cfg, err := loadConfig(*configPath) if err != nil { return err } names := make([]string, 0, len(cfg.Servers)) for n := range cfg.Servers { names = append(names, n) } sort.Strings(names) for _, n := range names { marker := " " if n == cfg.Default { marker = "*" } fmt.Printf("%s %-15s %s\n", marker, n, cfg.Servers[n].URL) } if len(names) == 0 { fmt.Println("(no servers configured)") } if cfg.fromEnv { fmt.Fprintln(os.Stderr, "(synthetic config from EVENTBUS_AMQP_* env vars; run `rb-search init` to persist)") } return nil } func cmdSetDefault(args []string) error { fs := newFlagSet("set-default") configPath := fs.String("config", "", "config file path") if err := fs.Parse(args); err != nil { return err } if fs.NArg() != 1 { return fmt.Errorf("usage: rb-search set-default ") } name := fs.Arg(0) cfg, err := loadConfig(*configPath) if err != nil { return err } if _, ok := cfg.Servers[name]; !ok { return fmt.Errorf("unknown server %q", name) } cfg.Default = name if err := cfg.save(); err != nil { return err } fmt.Printf("default set to %q in %s\n", name, cfg.path) return nil } func cmdInit(args []string) error { fs := newFlagSet("init") configPath := fs.String("config", "", "config file path") if err := fs.Parse(args); err != nil { return err } path := *configPath if path == "" { path = defaultConfigPath() } if err := writeSampleConfig(path); err != nil { return err } fmt.Printf("wrote sample config to %s\n", path) return nil }