first commit
This commit is contained in:
210
main.go
Normal file
210
main.go
Normal file
@@ -0,0 +1,210 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
const usage = `rb-search — peek into a RabbitMQ queue without consuming.
|
||||
|
||||
Reads N=queue-depth messages with basic.get + manual ack, runs a substring
|
||||
search in memory, prints matches, then requeues everything in reverse order
|
||||
so the queue is restored byte-for-byte. Properties and headers are preserved
|
||||
because messages are never republished.
|
||||
|
||||
Usage:
|
||||
rb-search [flags] # scan default server's queue
|
||||
rb-search --server prod --queue q # scan a specific server
|
||||
rb-search list # list configured servers
|
||||
rb-search set-default <name> # mark a server as default
|
||||
rb-search init # write a sample config
|
||||
|
||||
Config:
|
||||
Loaded from --config or $RB_SEARCH_CONFIG or
|
||||
$XDG_CONFIG_HOME/rb-search/config.json (default ~/.config/rb-search/config.json).
|
||||
|
||||
If no config file exists, rb-search falls back to env vars:
|
||||
EVENTBUS_AMQP_HOST (required to enable the fallback)
|
||||
EVENTBUS_AMQP_USER, EVENTBUS_AMQP_PASS
|
||||
EVENTBUS_AMQP_PORT (default 5672)
|
||||
EVENTBUS_AMQP_VHOST (default /)
|
||||
These materialize a synthetic server named "env" that becomes the default.
|
||||
|
||||
Flags:
|
||||
--config PATH Config file path.
|
||||
--server NAME Server alias (overrides 'default').
|
||||
--queue NAME Queue to scan. Required for the scan command.
|
||||
--search TEXT Substring to match against message body. Empty -> print all.
|
||||
--full Include envelope (exchange, routing-key, headers, props).
|
||||
Default prints only the body, pretty-printed if it is JSON.
|
||||
--max-body N Truncate body to N bytes when printing (0 = full). Default 0.
|
||||
--timeout DUR AMQP dial timeout. Default 30s.
|
||||
--json Machine-readable output. Combine with --full to emit a
|
||||
full envelope object per message; without --full, emits
|
||||
one body per line (compact JSON if the body is JSON,
|
||||
otherwise the body as a JSON string).
|
||||
|
||||
Exit status:
|
||||
0 success (any number of matches)
|
||||
1 runtime error
|
||||
2 bad usage
|
||||
`
|
||||
|
||||
func main() {
|
||||
if err := run(os.Args[1:]); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run(args []string) error {
|
||||
if len(args) > 0 {
|
||||
switch args[0] {
|
||||
case "list", "ls":
|
||||
return cmdList(args[1:])
|
||||
case "set-default":
|
||||
return cmdSetDefault(args[1:])
|
||||
case "init":
|
||||
return cmdInit(args[1:])
|
||||
case "help", "-h", "--help":
|
||||
fmt.Print(usage)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return cmdScan(args)
|
||||
}
|
||||
|
||||
func newFlagSet(name string) *flag.FlagSet {
|
||||
fs := flag.NewFlagSet(name, flag.ContinueOnError)
|
||||
fs.SetOutput(os.Stderr)
|
||||
fs.Usage = func() {
|
||||
fmt.Fprint(os.Stderr, usage)
|
||||
}
|
||||
return fs
|
||||
}
|
||||
|
||||
func cmdScan(args []string) error {
|
||||
fs := newFlagSet("scan")
|
||||
configPath := fs.String("config", "", "config file path")
|
||||
server := fs.String("server", "", "server alias")
|
||||
queue := fs.String("queue", "", "queue name")
|
||||
search := fs.String("search", "", "substring to match in body")
|
||||
maxBody := fs.Int("max-body", 0, "truncate body to N bytes (0 = full)")
|
||||
timeout := fs.Duration("timeout", 30*time.Second, "dial timeout")
|
||||
jsonOut := fs.Bool("json", false, "machine-readable JSON output (body only; combine with --full for envelope)")
|
||||
full := fs.Bool("full", false, "print full envelope (exchange, headers, props) instead of body only")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if *queue == "" {
|
||||
fs.Usage()
|
||||
return fmt.Errorf("--queue is required")
|
||||
}
|
||||
|
||||
cfg, err := loadConfig(*configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
name, srv, err := cfg.resolveServer(*server)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origin := "config"
|
||||
if cfg.fromEnv {
|
||||
origin = "env"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "[rb-search] server=%s (%s) queue=%s search=%q\n", name, origin, *queue, *search)
|
||||
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
return scanQueue(ctx, srv.URL, scanOptions{
|
||||
queue: *queue,
|
||||
search: *search,
|
||||
maxBody: *maxBody,
|
||||
timeout: *timeout,
|
||||
out: os.Stdout,
|
||||
diag: os.Stderr,
|
||||
jsonOut: *jsonOut,
|
||||
full: *full,
|
||||
})
|
||||
}
|
||||
|
||||
func cmdList(args []string) error {
|
||||
fs := newFlagSet("list")
|
||||
configPath := fs.String("config", "", "config file path")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
cfg, err := loadConfig(*configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
names := make([]string, 0, len(cfg.Servers))
|
||||
for n := range cfg.Servers {
|
||||
names = append(names, n)
|
||||
}
|
||||
sort.Strings(names)
|
||||
for _, n := range names {
|
||||
marker := " "
|
||||
if n == cfg.Default {
|
||||
marker = "*"
|
||||
}
|
||||
fmt.Printf("%s %-15s %s\n", marker, n, cfg.Servers[n].URL)
|
||||
}
|
||||
if len(names) == 0 {
|
||||
fmt.Println("(no servers configured)")
|
||||
}
|
||||
if cfg.fromEnv {
|
||||
fmt.Fprintln(os.Stderr, "(synthetic config from EVENTBUS_AMQP_* env vars; run `rb-search init` to persist)")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cmdSetDefault(args []string) error {
|
||||
fs := newFlagSet("set-default")
|
||||
configPath := fs.String("config", "", "config file path")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if fs.NArg() != 1 {
|
||||
return fmt.Errorf("usage: rb-search set-default <server-name>")
|
||||
}
|
||||
name := fs.Arg(0)
|
||||
cfg, err := loadConfig(*configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := cfg.Servers[name]; !ok {
|
||||
return fmt.Errorf("unknown server %q", name)
|
||||
}
|
||||
cfg.Default = name
|
||||
if err := cfg.save(); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("default set to %q in %s\n", name, cfg.path)
|
||||
return nil
|
||||
}
|
||||
|
||||
func cmdInit(args []string) error {
|
||||
fs := newFlagSet("init")
|
||||
configPath := fs.String("config", "", "config file path")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
path := *configPath
|
||||
if path == "" {
|
||||
path = defaultConfigPath()
|
||||
}
|
||||
if err := writeSampleConfig(path); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("wrote sample config to %s\n", path)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user