Files
rb-search/main.go
Alex Shlyakhov be3251716c
Some checks failed
release / release (push) Has been cancelled
Build infrastructure for releases
- Makefile: cross-compile to linux/{amd64,arm64}, darwin/{amd64,arm64},
  windows/amd64 into dist/ with SHA256SUMS. Version is stamped via
  -ldflags '-X main.version=...'.
- .gitea/workflows/release.yml: on tag push (v*) the workflow runs
  `make dist`, creates a Gitea release for the tag and uploads every
  artefact from dist/ via the Gitea API.
- main.go: `rb-search version` / --version prints the stamped version.
- README: new "Готовые бинари" section with curl/Invoke-WebRequest
  install snippets and a checksum-verify hint.
2026-05-26 13:46:16 +05:00

217 lines
5.9 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"sort"
"syscall"
"time"
)
// version is overridden at build time via -ldflags '-X main.version=...'.
var version = "dev"
const usage = `rb-search — peek into a RabbitMQ queue without consuming.
Reads N=queue-depth messages with basic.get + manual ack, runs a substring
search in memory, prints matches, then requeues everything in reverse order
so the queue is restored byte-for-byte. Properties and headers are preserved
because messages are never republished.
Usage:
rb-search [flags] # scan default server's queue
rb-search --server prod --queue q # scan a specific server
rb-search list # list configured servers
rb-search set-default <name> # mark a server as default
rb-search init # write a sample config
Config:
Loaded from --config or $RB_SEARCH_CONFIG or
$XDG_CONFIG_HOME/rb-search/config.json (default ~/.config/rb-search/config.json).
If no config file exists, rb-search falls back to env vars:
EVENTBUS_AMQP_HOST (required to enable the fallback)
EVENTBUS_AMQP_USER, EVENTBUS_AMQP_PASS
EVENTBUS_AMQP_PORT (default 5672)
EVENTBUS_AMQP_VHOST (default /)
These materialize a synthetic server named "env" that becomes the default.
Flags:
--config PATH Config file path.
--server NAME Server alias (overrides 'default').
--queue NAME Queue to scan. Required for the scan command.
--search TEXT Substring to match against message body. Empty -> print all.
--full Include envelope (exchange, routing-key, headers, props).
Default prints only the body, pretty-printed if it is JSON.
--max-body N Truncate body to N bytes when printing (0 = full). Default 0.
--timeout DUR AMQP dial timeout. Default 30s.
--json Machine-readable output. Combine with --full to emit a
full envelope object per message; without --full, emits
one body per line (compact JSON if the body is JSON,
otherwise the body as a JSON string).
Exit status:
0 success (any number of matches)
1 runtime error
2 bad usage
`
func main() {
if err := run(os.Args[1:]); err != nil {
fmt.Fprintln(os.Stderr, "error:", err)
os.Exit(1)
}
}
func run(args []string) error {
if len(args) > 0 {
switch args[0] {
case "list", "ls":
return cmdList(args[1:])
case "set-default":
return cmdSetDefault(args[1:])
case "init":
return cmdInit(args[1:])
case "version", "--version", "-V":
fmt.Println(version)
return nil
case "help", "-h", "--help":
fmt.Print(usage)
return nil
}
}
return cmdScan(args)
}
func newFlagSet(name string) *flag.FlagSet {
fs := flag.NewFlagSet(name, flag.ContinueOnError)
fs.SetOutput(os.Stderr)
fs.Usage = func() {
fmt.Fprint(os.Stderr, usage)
}
return fs
}
func cmdScan(args []string) error {
fs := newFlagSet("scan")
configPath := fs.String("config", "", "config file path")
server := fs.String("server", "", "server alias")
queue := fs.String("queue", "", "queue name")
search := fs.String("search", "", "substring to match in body")
maxBody := fs.Int("max-body", 0, "truncate body to N bytes (0 = full)")
timeout := fs.Duration("timeout", 30*time.Second, "dial timeout")
jsonOut := fs.Bool("json", false, "machine-readable JSON output (body only; combine with --full for envelope)")
full := fs.Bool("full", false, "print full envelope (exchange, headers, props) instead of body only")
if err := fs.Parse(args); err != nil {
return err
}
if *queue == "" {
fs.Usage()
return fmt.Errorf("--queue is required")
}
cfg, err := loadConfig(*configPath)
if err != nil {
return err
}
name, srv, err := cfg.resolveServer(*server)
if err != nil {
return err
}
origin := "config"
if cfg.fromEnv {
origin = "env"
}
fmt.Fprintf(os.Stderr, "[rb-search] server=%s (%s) queue=%s search=%q\n", name, origin, *queue, *search)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
return scanQueue(ctx, srv.URL, scanOptions{
queue: *queue,
search: *search,
maxBody: *maxBody,
timeout: *timeout,
out: os.Stdout,
diag: os.Stderr,
jsonOut: *jsonOut,
full: *full,
})
}
func cmdList(args []string) error {
fs := newFlagSet("list")
configPath := fs.String("config", "", "config file path")
if err := fs.Parse(args); err != nil {
return err
}
cfg, err := loadConfig(*configPath)
if err != nil {
return err
}
names := make([]string, 0, len(cfg.Servers))
for n := range cfg.Servers {
names = append(names, n)
}
sort.Strings(names)
for _, n := range names {
marker := " "
if n == cfg.Default {
marker = "*"
}
fmt.Printf("%s %-15s %s\n", marker, n, cfg.Servers[n].URL)
}
if len(names) == 0 {
fmt.Println("(no servers configured)")
}
if cfg.fromEnv {
fmt.Fprintln(os.Stderr, "(synthetic config from EVENTBUS_AMQP_* env vars; run `rb-search init` to persist)")
}
return nil
}
func cmdSetDefault(args []string) error {
fs := newFlagSet("set-default")
configPath := fs.String("config", "", "config file path")
if err := fs.Parse(args); err != nil {
return err
}
if fs.NArg() != 1 {
return fmt.Errorf("usage: rb-search set-default <server-name>")
}
name := fs.Arg(0)
cfg, err := loadConfig(*configPath)
if err != nil {
return err
}
if _, ok := cfg.Servers[name]; !ok {
return fmt.Errorf("unknown server %q", name)
}
cfg.Default = name
if err := cfg.save(); err != nil {
return err
}
fmt.Printf("default set to %q in %s\n", name, cfg.path)
return nil
}
func cmdInit(args []string) error {
fs := newFlagSet("init")
configPath := fs.String("config", "", "config file path")
if err := fs.Parse(args); err != nil {
return err
}
path := *configPath
if path == "" {
path = defaultConfigPath()
}
if err := writeSampleConfig(path); err != nil {
return err
}
fmt.Printf("wrote sample config to %s\n", path)
return nil
}