Skip to content

Commit 612da12

Browse files
committed
local dev/test env setup for rmq/amqp, build out events/publisher for rabbitMQ, add a subscriber for rabbitMQ
1 parent b200fb8 commit 612da12

File tree

7 files changed

+158
-11
lines changed

7 files changed

+158
-11
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.data
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
services:
2+
rabbitmq:
3+
image: rabbitmq:3-management-alpine
4+
container_name: 'rabbitmq'
5+
ports:
6+
- 5672:5672
7+
- 15672:15672
8+
volumes:
9+
- ~/.data/:/var/lib/rabbitmq/
10+
- ~/.data/:/var/log/rabbitmq

config/config.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type RabbitMQConfig struct {
8989
Immediate bool `mapstructure:"Immediate"`
9090
DeliveryMode int `mapstructure:"DeliveryMode"`
9191
ContentType string `mapstructure:"ContentType"`
92+
AutoAck bool `mapstructure:"AutoAck"`
9293

9394
// Optional
9495
Durable bool `mapstructure:"Durable"`
@@ -183,16 +184,17 @@ func Read() *Config {
183184
viper.SetDefault("notifications.EventsQueue", "https://sqs.us-east-1.amazonaws.com/351249512935/demo-queue")
184185

185186
// Default settings for RabbitMQ
186-
viper.SetDefault("rabbitMQ.URL", "amqp://guest:guest@localhost:5672/")
187-
viper.SetDefault("rabbitMQ.QueueName", "defaultQueue")
188-
viper.SetDefault("rabbitMQ.ExchangeName", "defaultExchange")
189-
viper.SetDefault("rabbitMQ.Durable", true)
190-
viper.SetDefault("rabbitMQ.AutoDelete", false)
191-
viper.SetDefault("rabbitMQ.Exclusive", false)
192-
viper.SetDefault("rabbitMQ.NoWait", false)
193-
viper.SetDefault("rabbitMQ.Mandatory", false)
194-
viper.SetDefault("rabbitMQ.Immediate", false)
195-
viper.SetDefault("rabbitMQ.ContentType", "text/plain")
187+
viper.SetDefault("rmq.URL", "amqp://guest:guest@localhost:5672/")
188+
viper.SetDefault("rmq.QueueName", "defaultQueue")
189+
viper.SetDefault("rmq.ExchangeName", "")
190+
viper.SetDefault("rmq.AutoAck", true)
191+
viper.SetDefault("rmq.Durable", true)
192+
viper.SetDefault("rmq.AutoDelete", false)
193+
viper.SetDefault("rmq.Exclusive", false)
194+
viper.SetDefault("rmq.NoWait", false)
195+
viper.SetDefault("rmq.Mandatory", false)
196+
viper.SetDefault("rmq.Immediate", false)
197+
viper.SetDefault("rmq.ContentType", "text/plain")
196198

197199
// StreamConfig
198200
viper.SetDefault("streams.zeroMQListenAddr", "tcp://127.0.0.1:5558")

events/publishers/rmq/rmq.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package rmq
22

33
import (
4+
"fmt"
5+
46
"github.com/edobtc/cloudkit/config"
57
"github.com/streadway/amqp"
68
)
@@ -14,6 +16,8 @@ type Publisher struct {
1416
func NewPublisher() (*Publisher, error) {
1517
queueName := config.Read().RabbitMQ.QueueName
1618

19+
fmt.Println(config.Read().RabbitMQ.URL)
20+
1721
conn, err := amqp.Dial(config.Read().RabbitMQ.URL)
1822
if err != nil {
1923
return nil, err
@@ -43,6 +47,24 @@ func NewPublisher() (*Publisher, error) {
4347
}, nil
4448
}
4549

50+
func (r *Publisher) Listen() (<-chan amqp.Delivery, error) {
51+
queueName := config.Read().RabbitMQ.QueueName
52+
53+
msgs, err := r.channel.Consume(
54+
queueName, // queue
55+
"", // consumer
56+
true, //config.Read().RabbitMQ.AutoAck, // auto-ack
57+
config.Read().RabbitMQ.Exclusive, // exclusive
58+
false, // no-local
59+
config.Read().RabbitMQ.NoWait, // no-wait
60+
nil, // args
61+
)
62+
if err != nil {
63+
return nil, err
64+
}
65+
return msgs, nil
66+
}
67+
4668
func (r *Publisher) Send(data []byte) error {
4769
return r.channel.Publish(
4870
config.Read().RabbitMQ.ExchangeName, // exchange

events/subscribers/autoload/autoload.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77

88
// conforming implementations
99
eclair "github.com/edobtc/cloudkit/events/subscribers/lightning/eclair/ws"
10+
rmq "github.com/edobtc/cloudkit/events/subscribers/rmq"
1011
zmq "github.com/edobtc/cloudkit/events/subscribers/zmq"
1112
)
1213

1314
// Ensure that the Subscriber interface is properly implemented by the autoloaded subscribers
1415
var _ subscribers.Subscriber = &eclair.EclairSubscriber{}
1516
var _ subscribers.Subscriber = &zmq.Subscriber{}
17+
var _ subscribers.Subscriber = &rmq.RMQSubscriber{}
1618

1719
var (
1820
ErrAdapterNotFound = errors.New("adapter by name not found")
@@ -24,7 +26,10 @@ func NewSubscriber(adapter string) (subscribers.Subscriber, error) {
2426
return eclair.NewSubscriber(), nil
2527
case "zmq":
2628
return zmq.NewSubscriber(), nil
27-
29+
case "rmq":
30+
// NewSubscriber returns an err along
31+
// with the subscriber
32+
return rmq.NewSubscriber()
2833
default:
2934
return nil, ErrAdapterNotFound
3035
}

events/subscribers/rmq/rmq.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package rmq
2+
3+
import (
4+
"github.com/edobtc/cloudkit/config"
5+
"github.com/streadway/amqp"
6+
)
7+
8+
type RMQSubscriber struct {
9+
connection *amqp.Connection
10+
channel *amqp.Channel
11+
queueName string
12+
}
13+
14+
func NewSubscriber() (*RMQSubscriber, error) {
15+
queueName := config.Read().RabbitMQ.QueueName
16+
17+
conn, err := amqp.Dial(config.Read().RabbitMQ.URL)
18+
if err != nil {
19+
return nil, err
20+
}
21+
22+
ch, err := conn.Channel()
23+
if err != nil {
24+
return nil, err
25+
}
26+
27+
_, err = ch.QueueDeclare(
28+
queueName,
29+
config.Read().RabbitMQ.Durable,
30+
config.Read().RabbitMQ.AutoDelete,
31+
config.Read().RabbitMQ.Exclusive,
32+
config.Read().RabbitMQ.NoWait,
33+
nil,
34+
)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
return &RMQSubscriber{
40+
connection: conn,
41+
channel: ch,
42+
queueName: queueName,
43+
}, nil
44+
}
45+
46+
func (r *RMQSubscriber) Start() chan bool {
47+
// Start consuming messages
48+
msgs, err := r.channel.Consume(
49+
r.queueName,
50+
"",
51+
config.Read().RabbitMQ.AutoAck,
52+
config.Read().RabbitMQ.Exclusive,
53+
false,
54+
config.Read().RabbitMQ.NoWait,
55+
nil,
56+
)
57+
if err != nil {
58+
panic(err)
59+
}
60+
61+
done := make(chan bool)
62+
go func() {
63+
for d := range msgs {
64+
// Process message
65+
// Placeholder: Print message to console
66+
println("Received message: ", string(d.Body))
67+
}
68+
done <- true
69+
}()
70+
return done
71+
}
72+
73+
func (r *RMQSubscriber) Detach() error {
74+
if err := r.channel.Close(); err != nil {
75+
return err
76+
}
77+
return r.connection.Close()
78+
}
79+
80+
func (r *RMQSubscriber) Listen() <-chan interface{} {
81+
msgs, err := r.channel.Consume(
82+
r.queueName,
83+
"",
84+
config.Read().RabbitMQ.AutoAck,
85+
config.Read().RabbitMQ.Exclusive,
86+
false,
87+
config.Read().RabbitMQ.NoWait,
88+
nil,
89+
)
90+
if err != nil {
91+
panic(err)
92+
}
93+
94+
output := make(chan interface{})
95+
go func() {
96+
for d := range msgs {
97+
output <- d.Body
98+
}
99+
close(output)
100+
}()
101+
return output
102+
}

events/subscribers/subscriber.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,10 @@ package subscribers
33
type Subscriber interface {
44
Start() chan bool
55
Detach() error
6+
7+
// we need to replace this return interface{}
8+
// with a specific type or a channel that will
9+
// return a structured piece of data eventually
10+
// (io.Reader or a Proto)
611
Listen() <-chan interface{}
712
}

0 commit comments

Comments
 (0)