You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to recreate this system (from the ZMQ Guide), but I can't get it to work:
I've got 3 programs: a publisher in pub/main.go, a proxy in proxy/main.go, and a subscriber in sub/main.go. Below are full codes of these programs:
pub
package main
import (
"flag""fmt""log""time""github.com/docker/distribution/context"
zmq "github.com/go-zeromq/zmq4"
)
vardefaultMsg=`{`+`"action": 2,`+`"container_id": "0123456789AB"`+`}`var (
port*intmsg*string
)
funcinit() {
port=flag.Int("port", 8080, "port to dial to")
msg=flag.String("msg", defaultMsg, "default message to send")
flag.Parse()
}
funcmain() {
addr:=fmt.Sprintf("tcp://localhost:%d", *port)
socket:=zmq.NewPub(context.Background())
err:=socket.Dial(addr)
iferr!=nil {
log.Fatalf("failed to dial: %v\n", err)
}
defersocket.Close()
log.Printf("listening on %s\n", addr)
for {
msg:=zmq.NewMsgFromString([]string{*msg})
err:=socket.Send(msg)
iferr!=nil {
panic(err) // Interrupted
}
log.Println("message sent")
time.Sleep(2*time.Second) // Wait for 2 seconds
}
}
proxy
package main
import (
"context""flag""fmt""log"
zmq "github.com/go-zeromq/zmq4"
)
var (
pubPort*intsubPort*int
)
funcinit() {
}
funcmain() {
subPort=flag.Int("port-sub", 8080, "port to listen for publisher input on")
pubPort=flag.Int("port-pub", 8081, "port to publish output on")
flag.Parse()
ctx:=context.Background()
subSocket:=zmq.NewXSub(ctx)
subAddr:=fmt.Sprintf("tcp://localhost:%v", *subPort)
err:=subSocket.Listen(subAddr)
iferr!=nil {
log.Fatalf("failed to dial: %v\n", err)
}
err=subSocket.SetOption(zmq.OptionSubscribe, "")
iferr!=nil {
log.Fatalf("failed to set option: %v\n", err)
}
pubSocket:=zmq.NewXPub(ctx)
pubAddr:=fmt.Sprintf("tcp://localhost:%v", *pubPort)
err=pubSocket.Listen(pubAddr)
iferr!=nil {
log.Fatalf("failed to listen: %v\n", err)
}
listener:=zmq.NewPair(ctx)
listener.Listen("inproc://pipe")
err=listener.SetOption(zmq.OptionSubscribe, "")
iferr!=nil {
log.Fatalf("failed to set option: %v\n", err)
}
gofunc() {
for {
log.Println("listener is waiting for a message")
msg, err:=listener.Recv()
iferr!=nil {
log.Fatalf("failed to recv from socket: %v\n", err)
}
b:=msg.Bytes()
fmt.Printf("message: %s, len: %v\n", b, len(b))
}
}()
proxy:=zmq.NewProxy(ctx, subSocket, pubSocket, listener)
log.Printf("listening for publishers on %s\n", subAddr)
log.Printf("listening for subscribers on %s\n", pubAddr)
fmt.Println("running proxy...")
err=proxy.Run()
iferr!=nil {
log.Fatalln(err)
}
}
sub
package main
import (
"context""flag""fmt""log"
zmq "github.com/go-zeromq/zmq4"
)
funcmain() {
port:=flag.Int("port", 8081, "port to listen on")
flag.Parse()
socket:=zmq.NewSub(context.Background())
defersocket.Close()
addr:=fmt.Sprintf("tcp://localhost:%d", *port)
err:=socket.Dial(addr)
iferr!=nil {
log.Fatalf("failed to dial: %v\n", err)
}
err=socket.SetOption(zmq.OptionSubscribe, "")
iferr!=nil {
log.Fatalf("failed to set option: %v\n", err)
}
fmt.Printf("dialing to %s\n", addr)
for {
msg, err:=socket.Recv()
iferr!=nil {
log.Fatalf("failed to recv from socket: %v\n", err)
}
b:=msg.Bytes()
fmt.Printf("message: %s, len: %v\n", b, len(b))
}
}
I'm pretty sure I'm using correct socket types and also enable subscribing all topics using socket.SetOption(zmq.OptionSubscribe, ""). I don't have any more ideas on how to get this simple example to work.
I'm trying to recreate this system (from the ZMQ Guide), but I can't get it to work:
I've got 3 programs: a publisher in
pub/main.go
, a proxy inproxy/main.go
, and a subscriber insub/main.go
. Below are full codes of these programs:pub
proxy
sub
I'm pretty sure I'm using correct socket types and also enable subscribing all topics using
socket.SetOption(zmq.OptionSubscribe, "")
. I don't have any more ideas on how to get this simple example to work.Might be related to #108
The text was updated successfully, but these errors were encountered: