[toc]

1.总体架构图

image-20220504143208830

2.carService

2.1service层

car.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package car

import (
"context"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/dao"
"coolcar/shared/id"

"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Service struct {
Mongo *dao.Mongo
Logger *zap.Logger
}

func (s *Service) CreateCar(c context.Context, req *carpb.CreateCarRequest) (*carpb.CarEntity, error) {
cr, err := s.Mongo.CreateCar(c)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &carpb.CarEntity{
Id: cr.ID.Hex(),
Car: cr.Car,
}, nil
}
func (s *Service) GetCar(c context.Context, req *carpb.GetCarRequest) (*carpb.Car, error) {
cr, err := s.Mongo.GetCar(c, id.CarID(req.Id))
if err != nil {
return nil, status.Error(codes.NotFound, "")
}
return cr.Car, nil
}
func (s *Service) GetCars(c context.Context, req *carpb.GetCarsRequest) (*carpb.GetCarsResponse, error) {
cars, err := s.Mongo.GetCars(c)
if err != nil {
s.Logger.Error("cannot found cars", zap.Error(err))
return nil, status.Error(codes.Internal, "")
}
res := &carpb.GetCarsResponse{}
for _, cr := range cars {
res.Cars = append(res.Cars, &carpb.CarEntity{

Car: cr.Car,
})
}
return res, nil
}
func (s *Service) LockCar(c context.Context, req *carpb.LockCarRequest) (*carpb.LockCarResponse, error) {
_, err := s.Mongo.UpdateCar(c, id.CarID(req.Id), carpb.CarStatus_UNLOCKED, &dao.CarUpdate{
Status: carpb.CarStatus_LOCKING,
})
if err != nil {
code := codes.Internal
if err == mongo.ErrNoDocuments {
code = codes.NotFound
}
return nil, status.Errorf(code, "cannot update: %v", err)
}
return &carpb.LockCarResponse{}, nil
}
func (s *Service) UnlockCar(c context.Context, req *carpb.UnlockCarRequest) (*carpb.UnlockCarResponse, error) {
_, err := s.Mongo.UpdateCar(c, id.CarID(req.Id), carpb.CarStatus_LOCKED, &dao.CarUpdate{
Status: carpb.CarStatus_UNLOCKING,
Driver: req.Driver,
UpdateTripID: true,
TripID: id.TripID(req.TripId),
})
if err != nil {
code := codes.Internal
if err == mongo.ErrNoDocuments {
code = codes.NotFound
}
return nil, status.Errorf(code, "cannot update: %v", err)
}
return &carpb.UnlockCarResponse{}, nil
}

// 是车上的软件来调用的
func (s *Service) UpdateCar(c context.Context, req *carpb.UpdateCarRequest) (*carpb.UpdateCarResponse, error) {
update := &dao.CarUpdate{
Status: req.Status,
Position: req.Position,
}

if req.Status == carpb.CarStatus_LOCKED {
update.Driver = &carpb.Driver{}
update.TripID = id.TripID("")
update.UpdateTripID = true
}
_, err := s.Mongo.UpdateCar(c, id.CarID(req.Id), carpb.CarStatus_CS_NOT_SPECIFIED, update)
if err != nil {
code := codes.Internal
if err == mongo.ErrNoDocuments {
code = codes.NotFound
}
return nil, status.Errorf(code, "cannot update: %v", err)
}
return &carpb.UpdateCarResponse{}, nil
}

2.2dao层mongo.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package dao

import (
"context"
carpb "coolcar/car/api/gen/v1"
"coolcar/shared/id"
mgo "coolcar/shared/mongo"
"coolcar/shared/mongo/objid"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
carField = "car"
statusField = carField + ".status"
driverField = carField + ".driver"
positionField = carField + ".position"
tripIdField = carField + ".tripid"
)

type Mongo struct {
col *mongo.Collection
}
type CarUpdate struct {
Status carpb.CarStatus
Position *carpb.Location
Driver *carpb.Driver
UpdateTripID bool
TripID id.TripID
}
type CarRecord struct {
mgo.IDField `bson:"inline"`
Car *carpb.Car `bson:"car"`
}

func NewMongo(db *mongo.Database) *Mongo {
return &Mongo{
col: db.Collection("car"),
}
}
func (m *Mongo) CreateCar(c context.Context) (*CarRecord, error) {
r := &CarRecord{
Car: &carpb.Car{
Position: &carpb.Location{
Latitude: 30,
Longitude: 120,
},
Status: carpb.CarStatus_LOCKED,
},
}
r.ID = mgo.NewObjID()
_, err := m.col.InsertOne(c, r)
if err != nil {
return nil, err
}
return r, nil
}
func (m *Mongo) GetCar(c context.Context, carId id.CarID) (*CarRecord, error) {
objId, err := objid.FromID(carId)
if err != nil {
return nil, fmt.Errorf("invalid id: %v", err)
}
res := m.col.FindOne(c, bson.M{
mgo.IDFieldName: objId,
})
if err := res.Err(); err != nil {
return nil, err
}
var cr CarRecord
err = res.Decode(&cr)
if err != nil {
return nil, fmt.Errorf("cannot decode : %v,", err)
}
return &cr, nil
}
func (m *Mongo) GetCars(c context.Context) ([]*CarRecord, error) {
filter := bson.M{}
res, err := m.col.Find(c, filter, options.Find())
if err != nil {
return nil, err
}
var cars []*CarRecord
for res.Next(c) {
var cr CarRecord
err = res.Decode(&cr)
if err != nil {
return nil, fmt.Errorf("cannot decode : %v,", err)
}
cars = append(cars, &cr)
}
return cars, nil
}

func (m *Mongo) UpdateCar(c context.Context, carID id.CarID, status carpb.CarStatus, car *CarUpdate) (*CarRecord, error) {
objID, err := objid.FromID(carID)
if err != nil {
return nil, fmt.Errorf("invalid id: %v", err)
}
filter := bson.M{
mgo.IDFieldName: objID,
}
if status != carpb.CarStatus_CS_NOT_SPECIFIED {
filter[statusField] = status
}
u := bson.M{}
if car.Status != carpb.CarStatus_CS_NOT_SPECIFIED {
u[statusField] = car.Status
}
if car.Driver != nil {
u[driverField] = car.Driver
}
if car.Position != nil {
u[positionField] = car.Position
}
if car.UpdateTripID {
u[tripIdField] = car.TripID.String()
}

res := m.col.FindOneAndUpdate(c, filter, mgo.Set(u), options.FindOneAndUpdate().SetReturnDocument(options.After))
return convertSingleResult(res)
}

func convertSingleResult(res *mongo.SingleResult) (*CarRecord, error) {
if err := res.Err(); err != nil {
return nil, err
}

var cr CarRecord
err := res.Decode(&cr)
if err != nil {
return nil, fmt.Errorf("cannot decode: %v", err)
}
return &cr, nil
}

2.3注册main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/car"
"coolcar/car/dao"
"coolcar/shared/server"
"log"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"google.golang.org/grpc"
)

func main() {
logger, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("cannot create logger : %v", err)
}
//1.获取GRPC服务
c := context.Background()
mgURI := "mongodb://localhost:27017/coolcar?readPreference=primary&ssl=false"
mongoClient, err := mongo.Connect(c, options.Client().ApplyURI(mgURI))
if err != nil {
logger.Fatal("cannot connect mongodb: %v", zap.Error(err))
}
db := mongoClient.Database("coolcar")
err = server.RunGRPCServer(&server.GRPCConfig{
Name: "car",
Addr: ":8084",
Logger: logger,
RegisterFunc: func(s *grpc.Server) {
carpb.RegisterCarServiceServer(s, &car.Service{
Mongo: dao.NewMongo(db),
Logger: logger,
})
},
})
if err != nil {
logger.Fatal("cannot server: %v", zap.Error(err))
}
}

2.4测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package car

import (
"context"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/dao"
"coolcar/shared/id"
mgo "coolcar/shared/mongo"
mongotesting "coolcar/shared/mongo/testing"
"encoding/json"
"fmt"
"os"
"testing"

"go.uber.org/zap"
)

func TestCarUpdate(t *testing.T) {
c := context.Background()
mc, err := mongotesting.NewClient(c)
if err != nil {
t.Fatalf("cannot create mongo client: %v", err)
}
logger, err := zap.NewDevelopment()
if err != nil {
t.Fatalf("cannot create logger : %v", err)
}
s := &Service{
Logger: logger,
Mongo: dao.NewMongo(mc.Database("coolcar")),
}
carID := id.CarID("521c60186800fc9e2ca14801")
mgo.NewObjIDWithValue(carID)
car1, err := s.CreateCar(c, &carpb.CreateCarRequest{})
if err != nil {
t.Fatalf("cannot create car: %v", err)
}
fmt.Println(car1)
cases := []struct {
name string
op func() error
want string
wantErr bool
}{
{
name: "get_car",
op: func() error {
return nil
},
want: `{"status":1,"position":{"latitude":30,"longitude":120}}`,
},
{
name: "unlock_car",
op: func() error {
_, err := s.UnlockCar(c, &carpb.UnlockCarRequest{
Id: carID.String(),
TripId: "test_tripID",
Driver: &carpb.Driver{
Id: "test_driver",
AvatarUrl: "test_avatarUrl",
},
})
return err
},
want: `{"status":2,"driver":{"id":"test_driver","avatar_url":"test_avatarUrl"},"position":{"latitude":30,"longitude":120},"trip_id":"test_tripID"}`,
},
{
name: "unlock_complete",
op: func() error {
_, err := s.UpdateCar(c, &carpb.UpdateCarRequest{
Id: carID.String(),
Position: &carpb.Location{
Latitude: 31,
Longitude: 120,
},
Status: carpb.CarStatus_UNLOCKED,
})
return err
},
want: `{"status":3,"driver":{"id":"test_driver","avatar_url":"test_avatarUrl"},"position":{"latitude":31,"longitude":120},"trip_id":"test_tripID"}`,
},
{
name: "unlock_car_by_another_driver",
op: func() error {
_, err := s.UnlockCar(c, &carpb.UnlockCarRequest{
Id: carID.String(),
TripId: "bad_tripID",
Driver: &carpb.Driver{
Id: "bad_driver",
AvatarUrl: "bad_avatarUrl",
},
})
return err
},
wantErr: true,
},
{
name: "lock_car",
op: func() error {
_, err := s.LockCar(c, &carpb.LockCarRequest{
Id: carID.String(),
})
return err
},
want: `{"status":4,"driver":{"id":"test_driver","avatar_url":"test_avatarUrl"},"position":{"latitude":31,"longitude":120},"trip_id":"test_tripID"}`,
},
{
name: "lock_complete",
op: func() error {
_, err := s.UpdateCar(c, &carpb.UpdateCarRequest{
Id: carID.String(),
Status: carpb.CarStatus_LOCKED,
})
return err
},
want: `{"status":1,"driver":{},"position":{"latitude":31,"longitude":120}}`,
},
}

for _, cc := range cases {
err := cc.op()
if cc.wantErr {
if err == nil {
t.Errorf("%s: want error; got node", cc.name)
} else {
continue
}
}
car, err := s.GetCar(c, &carpb.GetCarRequest{
Id: carID.String(),
})
if err != nil {
t.Errorf("%s: operation faild: %v", cc.name, err)
continue
}
b, err := json.Marshal(car)
if err != nil {
t.Errorf("%s:faild marshal response: %v", cc.name, err)
}
got := string(b)
if cc.want != got {
t.Errorf("%s:incorrect response; want: %s, got: %s", cc.name, cc.want, got)
}
}
}
func TestMain(m *testing.M) {
os.Exit(mongotesting.RunWithMongoInDocker(m))
}

2.5在tripservice中完成车辆管理代码

  1. 修改trip.go的代码
1
2
3
4
5
6
7
type CarManager interface {
// 加入人的位置
// Verfigy来判断车可不可以被开锁
Verfigy(c context.Context, cid id.CarID, loc *rentalpb.Location) error
Unlock(c context.Context, cid id.CarID, aid id.AccountID, tid id.TripID, avatarURL string) error
Lock(c context.Context, cid id.CarID) error
}
  1. /trip/client/car/car.go实现上面三个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package car

import (
"context"
carpb "coolcar/car/api/gen/v1"
rentalpb "coolcar/rental/api/gen/v1"
"coolcar/shared/id"
"fmt"
)

type Manager struct {
CarService carpb.CarServiceClient
}

func (m *Manager) Verfigy(c context.Context, cid id.CarID, loc *rentalpb.Location) error {
car, err := m.CarService.GetCar(c, &carpb.GetCarRequest{
Id: cid.String(),
})
if err != nil {
return fmt.Errorf("cannot get car: %v", err)
}
if car.Status != carpb.CarStatus_LOCKED {
return fmt.Errorf("cannot unlock; status is %v", car.Status)
}
return nil
}
func (m *Manager) Unlock(c context.Context, cid id.CarID, aid id.AccountID, tid id.TripID, avatarURL string) error {
_, err := m.CarService.UnlockCar(c, &carpb.UnlockCarRequest{
Id: cid.String(),
Driver: &carpb.Driver{
Id: aid.String(),
AvatarUrl: avatarURL,
},
TripId: tid.String(),
})
if err != nil {
return fmt.Errorf("cannot Unlock the car: %v", err)
}
return nil
}
func (m *Manager) Lock(c context.Context, cid id.CarID) error {
_, err := m.CarService.LockCar(c, &carpb.LockCarRequest{
Id: cid.String(),
})
if err != nil {
return fmt.Errorf("cannot Lock: %v", err)
}
return nil
}
  1. trip/main.go注册,和carservice通信.
1
2
3
4
carConn, err := grpc.Dial("localhost:8084", grpc.WithInsecure())
if err != nil {
logger.Fatal("cannot connect car service", zap.Error(err))
}

3.使用RabbitMQ来实现汽车状态更新的发布

3.1.在CarService中定义并使用Publisher

  1. 定义Publisher函数,然后注册
1
2
3
4
5
6
7
8
type Publisher interface{
Publish(context.Context, *carpb.CarEntity) error
}
type Service struct {
Mongo *dao.Mongo
Logger *zap.Logger
Publisher Publisher
}
  1. 需要跟新的有LockCar,UnlockCar,UpdateCar在跟新数据库的后面加上s.publish(c,car)
1
2
3
4
5
6
7
8
9
func (s *Service) publish(c context.Context, car *dao.CarRecord){
err := s.Publisher.Publish(c,&carpb.CarEntity{
Id: car.ID.Hex(),
Car: car.Car,
})
if err != nil {
s.Logger.Warn("cannot publish",zap.Error(err))
}
}

3.2.实现Publisher函数

  1. 建立/car/amqpclient/amqp.go.在这里面不声明connection的URL,统一放在注册函数里面.方便管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package amqpclient

import (
"context"
carpb "coolcar/car/api/gen/v1"
"encoding/json"
"fmt"

"github.com/streadway/amqp"
)

type Publisher struct {
ch *amqp.Channel
exchange string
}

func NewPublisher(conn *amqp.Connection, ex string)(*Publisher, error){
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("cannot allocate channel: %v", err)
}
err = ch.ExchangeDeclare(
ex,
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("cannot declare exchange: %v",err)
}
return &Publisher{
ch: ch,
exchange: ex,
},nil
}

func (p *Publisher) Publish(c context.Context, car *carpb.CarEntity) error {
b, err := json.Marshal(car)
if err != nil {
return fmt.Errorf("cannot marshal: %v", err)
}
return p.ch.Publish(
p.exchange,
"",
false,
false,
amqp.Publishing{
Body: b,
},
)
}
  1. /car/main.go注册amqp连接
1
2
3
4
amqpConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
logger.Fatal("cannot dial amqp", zap.Error(err))
}

3.3.前后端联调

  1. 模拟创建汽车.建立/cmd/carclient/main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"context"
carpb "coolcar/car/api/gen/v1"
"fmt"

"google.golang.org/grpc"
)

func main() {
conn, err := grpc.Dial("localhost:8084", grpc.WithInsecure())
if err != nil {
panic(err)
}
cs := carpb.NewCarServiceClient(conn)
c := context.Background()
for i := 0; i < 5; i++ {
res, err := cs.CreateCar(c, &carpb.CreateCarRequest{})
if err != nil {
panic(err)
}
fmt.Printf("create car: %s\n", res.Id)
}
}

4.模拟汽车

4.1.汽车状态模拟

  1. 建立/car/sim/sim.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package sim

import (
"context"
carpb "coolcar/car/api/gen/v1"
"time"

"go.uber.org/zap"
)

//从消息队列收消息
type Subscriber interface {
Subscribe(context.Context) (ch chan *carpb.CarEntity, cleanUp func(), err error)
}
type Controller struct {
CarService carpb.CarServiceClient
Subscriber Subscriber
Logger *zap.Logger
}

func (c *Controller) RunSimulations(ctx context.Context) {
//针对每一台汽车模拟
//1.获取所有车
var cars []*carpb.CarEntity
for {
time.Sleep(3 * time.Second)
res, err := c.CarService.GetCars(ctx, &carpb.GetCarsRequest{})
if err != nil {
c.Logger.Error("cannot get cars", zap.Error(err))
continue
}
cars = res.Cars
break
}

c.Logger.Info("Runing car simulations.", zap.Int("car_count", len(cars)))
msgCh, cleanUp, err := c.Subscriber.Subscribe(ctx)
defer cleanUp()
if err != nil {
c.Logger.Error("cannot subscribe", zap.Error(err))
return
}
//2.goruting和carid对应
carChans := make(map[string]chan *carpb.Car)
for _, car := range cars {
ch := make(chan *carpb.Car)
carChans[car.Id] = ch
go c.SimulateCar(context.Background(), car, ch)
}

// 3. 收消息
for carUpdate := range msgCh {
ch := carChans[carUpdate.Id]
if ch != nil {
ch <- carUpdate.Car
}
}
}

func (c *Controller) SimulateCar(ctx context.Context, initial *carpb.CarEntity, ch chan *carpb.Car) {
carID := initial.Id
c.Logger.Info("simulations car.", zap.String("id", carID))

for update := range ch {
if update.Status == carpb.CarStatus_UNLOCKING {
_, err := c.CarService.UpdateCar(ctx, &carpb.UpdateCarRequest{
Id: carID,
Status: carpb.CarStatus_UNLOCKED,
})
if err != nil {
c.Logger.Error("cannot unlock car", zap.Error(err))
}
} else if update.Status == carpb.CarStatus_LOCKING {
_, err := c.CarService.UpdateCar(ctx, &carpb.UpdateCarRequest{
Id: carID,
Status: carpb.CarStatus_LOCKED,
})
if err != nil {
c.Logger.Error("cannot lock car", zap.Error(err))
}
}
}
}
  1. /car/amqpClient/amqp.go加入下面代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

type Subscriber struct {
conn *amqp.Connection
exchange string
logger *zap.Logger
}
//初始化函数
func NewSubscriber(conn *amqp.Connection, exchange string, logger *zap.Logger) (*Subscriber, error) {
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("cannot allocate channel: %v", err)
}
// 这里创建之后就可以close掉,
defer ch.Close()
err = declareExchange(ch, exchange)
if err != nil {
return nil, fmt.Errorf("cannot declare exchange: %v", err)
}
return &Subscriber{
conn: conn,
exchange: exchange,
logger: logger,
}, nil
}

// 过度函数,主要是将amqp.Delivery转化为*carpb.CarEntity
func (s *Subscriber) Subscribe(c context.Context) (chan *carpb.CarEntity, func(), error) {
masCh, cleanUp, err := s.SubscribeRaw(c)
if err != nil {
return nil, cleanUp, err
}
carCh := make(chan *carpb.CarEntity)
go func() {
for msg := range masCh {
var car carpb.CarEntity
err := json.Unmarshal(msg.Body, &car)
if err != nil {
s.logger.Error("cannot unmarshal", zap.Error(err))
}
carCh <- &car
}
close(carCh)
}()
return carCh, cleanUp, nil

}

// 模拟消息队列和消费,因为函数中包括了消费,在实践中可以登录http://127.0.0.1:15672/来bind的一个测试队列。
func (s *Subscriber) SubscribeRaw(c context.Context) (<-chan amqp.Delivery, func(), error) {
ch, err := s.conn.Channel()
if err != nil {
return nil, func() {}, fmt.Errorf("cannot allocate channle,%v", err)
}
// 将清理的函数返回出去,让使用的人操作。
closeCh := func() {
err := ch.Close()
if err != nil {
s.logger.Error("cannot close channel", zap.Error(err))
}
}
q, err := ch.QueueDeclare(
"", //name
false, //furable
true, // autoDelete
false, //exclusive
false, // nowait
nil, // args
)

if err != nil {
return nil, closeCh, fmt.Errorf("cannot declare queue: %v", err)
}

cleanUp := func() {
_, err := ch.QueueDelete(
q.Name,
false, //
false,
false,
)
if err != nil {
s.logger.Error("cannot QueueDelete", zap.Error(err))
}
closeCh()
}
ch.QueueBind(
q.Name,
"", //key
s.exchange,
false, //nowait
nil, // args
)

if err != nil {
return nil, cleanUp, fmt.Errorf("cannot bind queue: %v", err)
}
msgs, err := ch.Consume(
q.Name,
"", // consumer
true, //autoAck
false, // exclusive
false, //noLocal
false, //noWarit
nil, // args
)
if err != nil {
return nil, cleanUp, fmt.Errorf("cannot consume queue: %v", err)
}
return msgs, cleanUp, nil
}

func declareExchange(ch *amqp.Channel, exchange string) error {
return ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
}
  1. 在main函数中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 后台模拟
// 自己连自己,grpc.Dial在发请求的时候连接。并不是Dial立马连接。
carConn, err := grpc.Dial("localhost:8084", grpc.WithInsecure())
if err != nil {
logger.Fatal("cannot connection carservice", zap.Error(err))
}
sub, err := amqpclient.NewSubscriber(amqpConn, "coolcar", logger)
if err != nil {
logger.Fatal("cannot create subscriber", zap.Error(err))
}
simController := &sim.Controller{
CarService: carpb.NewCarServiceClient(carConn),
Subscriber: sub,
Logger: logger,
}
go simController.RunSimulations(context.Background())

4.2.汽车位置模拟

创建一个Ai微服务模拟汽车的位置更新。

微服务目录:

image-20220509172535487

和其他的微服务大致相同,位置模拟逻辑在ai.go,将模拟的数据发到pos_sim的消息队列的Exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package ai

import (
"context"
aipb "coolcar/ai/api/gen/v1"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/mq"
"math"
"time"

"go.uber.org/zap"
)

type Service struct {
Logger *zap.Logger
Publisher mq.Publisher
}

var carMap = make(map[string]bool, 0)

func (s *Service) LicIdentity(c context.Context, req *aipb.LicIdentityRequest) (*aipb.Identity, error) {
return &aipb.Identity{
LicNumber: "12389721398791",
Name: "谭谭",
Gender: aipb.Gender_MALE,
BirthDateMillis: 23498237948237,
}, nil
}
func (s *Service) MeasureDistance(c context.Context, req *aipb.MeasureDistanceRequest) (*aipb.MeasureDistanceResponse, error) {
start, end := req.From, req.To
dis := math.Sqrt(math.Pow((end.Latitude-start.Latitude), 2)+math.Pow((end.Longitude-end.Longitude), 2)) * 111
return &aipb.MeasureDistanceResponse{
DistanceKm: float32(dis),
}, nil
}
func (s *Service) SimulateCarPos(c context.Context, req *aipb.SimulateCarPosRequest) (*aipb.SimulateCarPosResponse, error) {
carMap[req.CarId] = true
go func(cm map[string]bool) {
for cm[req.CarId] {
req.Pos.Latitude += 0.0001
req.Pos.Longitude += 0.00005
err := s.Publisher.Publish(c, &carpb.CarEntity{
Id: req.CarId,
Car: &carpb.Car{
Position: &carpb.Location{
Latitude: req.Pos.Latitude,
Longitude: req.Pos.Longitude,
},
},
})
if err != nil {
s.Logger.Warn("cannot publish", zap.Error(err))
}
time.Sleep(1 * time.Second)
}
}(carMap)
return &aipb.SimulateCarPosResponse{}, nil
}
func (s *Service) EndSimulateCarPos(c context.Context, req *aipb.EndSimulateCarPosRequest) (*aipb.EndSimulateCarPosResponse, error) {
carMap[req.CarId] = false
s.Publisher.Publish(c, &carpb.CarEntity{
Id: req.CarId,
Car: &carpb.Car{
Status: carpb.CarStatus_LOCKED,
},
})
return &aipb.EndSimulateCarPosResponse{}, nil
}

ai.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
syntax = "proto3";
package ai.v1;
option go_package = "coolcar/ai/api/gen/v1;aipb";

enum Gender{
G_NOT_SPECIFIED = 0;
FEMALE = 1;
MALE = 2;
}
message Location {
double latitude = 1;
double longitude = 2;
}
message Identity{
string lic_number=1;
string name = 2;
Gender gender = 3;
int64 birth_date_millis = 4;
}
message LicIdentityRequest{
bytes photo=1;
bool real_ai=2;
}
message MeasureDistanceRequest{
Location from = 1;
Location to=2;
}
message MeasureDistanceResponse{
float distance_km=1;
}
message SimulateCarPosRequest{
string car_id=1;
Location pos=2;
}
message SimulateCarPosResponse{}
message EndSimulateCarPosRequest{
string car_id=1;
}
message CarPosUpdate{
string car_id=1;
Location pos=2;
}
message EndSimulateCarPosResponse{}
service AiService{
rpc LicIdentity (LicIdentityRequest) returns(Identity);
rpc MeasureDistance (MeasureDistanceRequest) returns(MeasureDistanceResponse);
rpc SimulateCarPos (SimulateCarPosRequest) returns(SimulateCarPosResponse);
rpc EndSimulateCarPos (EndSimulateCarPosRequest) returns(EndSimulateCarPosResponse);
}

4.3模拟代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package sim

import (
"context"
aipb "coolcar/ai/api/gen/v1"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/mq"
"fmt"
"time"

"go.uber.org/zap"
)

type Controller struct {
CarService carpb.CarServiceClient
Aiservice aipb.AiServiceClient
CarSubscriber mq.Subscriber
PosSubscriber mq.Subscriber
Logger *zap.Logger
}

func (c *Controller) RunSimulations(ctx context.Context) {
//针对每一台汽车模拟
//1.获取所有车
var cars []*carpb.CarEntity
for {
time.Sleep(3 * time.Second)
res, err := c.CarService.GetCars(ctx, &carpb.GetCarsRequest{})
if err != nil {
c.Logger.Error("cannot get cars", zap.Error(err))
continue
}
cars = res.Cars
break
}

c.Logger.Info("Runing car simulations.", zap.Int("car_count", len(cars)))
// car
carCh, carcleanUp, err := c.CarSubscriber.Subscribe(ctx)
defer carcleanUp()
if err != nil {
c.Logger.Error("cannot subscribe car", zap.Error(err))
return
}
// pos
posCh, poscleanUp, err := c.PosSubscriber.Subscribe(ctx)
defer poscleanUp()
if err != nil {
c.Logger.Error("cannot subscribe position", zap.Error(err))
return
}
//2.gorutining和carid对应
//car
carChans := make(map[string]chan *carpb.Car)
posChans := make(map[string]chan *carpb.Location)
for _, car := range cars {
carFanoutCh := make(chan *carpb.Car)
carChans[car.Id] = carFanoutCh
posFanoutCh := make(chan *carpb.Location)
posChans[car.Id] = posFanoutCh
go c.SimulateCar(context.Background(), car, carFanoutCh, posFanoutCh)
}

// 3. 收消息
for {
select {
case carUpdate := <-carCh:
ch := carChans[carUpdate.Id]
if ch != nil {
ch <- carUpdate.Car
}
case posUpdate := <-posCh:
ch := posChans[posUpdate.Id]
if ch != nil && posUpdate.Car.Position != nil {
ch <- &carpb.Location{
Latitude: posUpdate.Car.Position.Latitude,
Longitude: posUpdate.Car.Position.Longitude,
}
}
}
}
}

func (c *Controller) SimulateCar(ctx context.Context, initial *carpb.CarEntity, carCh chan *carpb.Car, posCh chan *carpb.Location) {
car := initial
c.Logger.Info("simulations car.", zap.String("id", car.Id))
for {
select {
case update := <-carCh:
if update.Status == carpb.CarStatus_UNLOCKING {
updated, err := c.unlockCar(ctx, car)
if err != nil {
c.Logger.Error("cannot unlock car", zap.Error(err))
break
}
car = updated
} else if update.Status == carpb.CarStatus_LOCKING {
updated, err := c.lockCar(ctx, car)
if err != nil {
c.Logger.Error("cannot lock car", zap.Error(err))
break
}
car = updated
}
case pos := <-posCh:
updated, err := c.moveCar(ctx, car, pos)
if err != nil {
c.Logger.Error("cannot move car", zap.Error(err))
break
}
car = updated
}
}
}

func (c *Controller) lockCar(ctx context.Context, car *carpb.CarEntity) (*carpb.CarEntity, error) {
car.Car.Status = carpb.CarStatus_LOCKED
_, err := c.CarService.UpdateCar(ctx, &carpb.UpdateCarRequest{
Id: car.Id,
Status: car.Car.Status,
})
if err != nil {
return nil, fmt.Errorf("cannot lock car:%v", err)
}
_, err = c.Aiservice.EndSimulateCarPos(ctx, &aipb.EndSimulateCarPosRequest{
CarId: car.Id,
})
if err != nil {
return nil, fmt.Errorf("cannot EndSimulateCarPos:%v", err)
}
return car, nil
}

func (c *Controller) unlockCar(ctx context.Context, car *carpb.CarEntity) (*carpb.CarEntity, error) {
car.Car.Status = carpb.CarStatus_UNLOCKED
_, err := c.CarService.UpdateCar(ctx, &carpb.UpdateCarRequest{
Id: car.Id,
Status: car.Car.Status,
})
if err != nil {
return nil, fmt.Errorf("cannot Unlock car:%v", err)
}
_, err = c.Aiservice.SimulateCarPos(ctx, &aipb.SimulateCarPosRequest{
CarId: car.Id,
Pos: &aipb.Location{
Latitude: car.Car.Position.Latitude,
Longitude: car.Car.Position.Longitude,
},
})
if err != nil {
c.Logger.Error("cannot SimulateCarPos", zap.Error(err))
}
return car, nil
}
func (c *Controller) moveCar(ctx context.Context, car *carpb.CarEntity, pos *carpb.Location) (*carpb.CarEntity, error) {
car.Car.Position = pos
_, err := c.CarService.UpdateCar(ctx, &carpb.UpdateCarRequest{
Id: car.Id,
Position: pos,
})
if err != nil {
return nil, fmt.Errorf("cannot update car:%v", err)
}
return car, nil
}

5.Websocket

而在websocket出现之前,开发实时web应用的方式为轮询,不停地向服务器发送 HTTP 请求,问有没有数据,有数据的话服务器就用响应报文回应。如果轮询的频率比较高,那么就可以近似地实现“实时通信”的效果轮询的缺点也很明显,反复发送无效查询请求耗费了大量的带宽和 CPU资源

WebSocket,是一种网络传输协议,位于OSI模型的应用层。可在单个TCP连接上进行全双工通信(通信允许数据在两个方向上同时传输),能更好的节省服务器资源和带宽并达到实时通迅。客户端和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输

应用场景:弹幕,媒体聊天,协同编辑,基于位置的应用,体育实况更新,股票基金报价实时更新

image-20220507122851526

5.1实践操作

1.引进包

1
go get github.com/gorilla/websocket

2.创建/cmd/websocket/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
"fmt"
"log"
"net/http"
"strconv"
"time"

"github.com/gorilla/websocket"
)

func main() {
http.HandleFunc("/ws", handleWebSocket)
log.Fatal(http.ListenAndServe(":9090", nil))
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
// 将http连接升级为websocket连接
u := &websocket.Upgrader{
// 来自什么的请求都可以
CheckOrigin: func(r *http.Request) bool {
return true
},
}
c, err := u.Upgrade(w, r, nil)
defer c.Close()
if err != nil {
fmt.Printf("cannot upgrade %v\n", err)
return
}

// 这个channl来记录是否有err出现
done := make(chan struct{})
//收消息
go func() {
for {
m := make(map[string]interface{})
err := c.ReadJSON(&m)
if err != nil {
if !websocket.IsCloseError(err,
websocket.CloseGoingAway,
websocket.CloseNormalClosure) {
fmt.Printf("unexpected read error: %v\n", err)
}
// 向done写入一个错误标记
done <- struct{}{}
break
}
fmt.Printf("message received : %v\n", m)
}

}()
i := 0
for {
i++
err = c.WriteJSON(map[string]string{
"hello": "websocket",
"msg_id": strconv.Itoa(i),
})
if err != nil {
fmt.Printf("cannot WriteJSON: %v\n", err)
}
select {
case <-time.After(200 * time.Millisecond):
//有错误就退出
case <-done:
return
}
time.Sleep(200 * time.Millisecond)
}
}

3.前端使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// websocket收消息
this.socket = wx.connectSocket({
url:"ws://localhost:9090/ws"
})
let msgReceived = 0
this.socket.onMessage(msg =>{
msgReceived++
console.log(msg)
})
// websocket 发消息
setInterval(() =>{
this.socket?.send({
data: JSON.stringify({
msg_received: msgReceived,
})
})
},3000)

5.2给汽车服务加websocket

  1. 创建/car/ws/ws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package ws

import (
"context"
"coolcar/car/mq"
"net/http"

"github.com/gorilla/websocket"
"go.uber.org/zap"
)

// Handler creates a websocket http handler
func Handler(u *websocket.Upgrader, sub mq.Subscriber, logger *zap.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := u.Upgrade(w, r, nil)
defer c.Close()
if err != nil {
logger.Warn("cannot upgrade", zap.Error(err))
return
}
msgs, cleanUp, err := sub.Subscribe(context.Background())
defer cleanUp()
if err != nil {
logger.Error("cannot subscribe", zap.Error(err))
//返回给用户错误
w.WriteHeader(http.StatusInternalServerError)
return
}
// 这个channl来记录是否有err出现
done := make(chan struct{})
//收消息
go func() {
for {
_, _, err := c.ReadMessage()
if err != nil {
if !websocket.IsCloseError(err,
websocket.CloseGoingAway,
websocket.CloseNormalClosure) {
logger.Warn("unexpected read error", zap.Error(err))
}
// 向done写入一个错误标记
done <- struct{}{}
break
}
}

}()
// 发消息:将从RabbitMQ中受到消息转出去。
for {
select {
case msg := <-msgs:
//收到了信息,就发给websocket客户端
err = c.WriteJSON(msg)
if err != nil {
logger.Warn("cannot write JSON", zap.Error(err))
}
//有错误就退出
case <-done:
return
}
}
}

}
  1. 在main中注册
1
2
3
4
5
6
7
8
9
10
11
//起websocket
u := &websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
http.HandleFunc("/ws", ws.Handler(u, sub, logger))
go func() {
logger.Info("HTTP server started.", zap.String("addr:", "9090"))
logger.Sugar().Fatal(http.ListenAndServe(":9090", nil))
}()
  1. 在前端创建/service/car.ts来结构化通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import camelcaseKeys from "camelcase-keys";
import { car } from "./proto_gen/car/car_pb";
import { Coolcar } from "./request";

export namespace CarService{
export function subscribe(onMsg: (c: car.v1.ICarEntity) => void) {
const socket = wx.connectSocket({
url: Coolcar.wsAddr + '/ws',
fail: console.log,
})
socket.onMessage(msg => {
const obj = JSON.parse(msg.data as string)
onMsg(car.v1.CarEntity.fromObject(
camelcaseKeys(obj, {
deep: true,
})))
})
return socket
}
}

在需要调用的页面

1
2
3
4
// websocket收消息
this.socket = CarService.subscribe(car => {
console.log(car)
})

6.行程更新

6.1内网访问添加头部信息

由于是内部调用,如何在context中假如countId是一个问题。

实现grpc.PerRPCCredentials()中的GetRequestMetadata加入自定义的头部信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package trip

import (
"context"
carpb "coolcar/car/api/gen/v1"
"coolcar/car/mq"
rentalpb "coolcar/rental/api/gen/v1"
"coolcar/shared/auth"
"coolcar/shared/id"

"go.uber.org/zap"
"google.golang.org/grpc"
)

// RunUpdate runs a trip updater
func RunUpdate(sub mq.Subscriber, ts rentalpb.TripServiceClient, logger *zap.Logger) {
ch, cleanUp, err := sub.Subscribe(context.Background())
defer cleanUp()
if err != nil {
logger.Fatal("cannot subscribe", zap.Error(err))
}
for car := range ch {
if car.Car.Status == carpb.CarStatus_UNLOCKED &&
car.Car.TripId != "" && car.Car.Driver.Id != "" {
_, err := ts.UpdateTrip(context.Background(), &rentalpb.UpdateTripRequest{
Id: car.Car.TripId,
Current: &rentalpb.Location{
Latitude: car.Car.Position.Latitude,
Longitude: car.Car.Position.Longitude,
},
}, grpc.PerRPCCredentials(&impersonation{
AccountID: id.AccountID(car.Car.Driver.Id),
}))
if err != nil {
logger.Error("cannot update trip", zap.String("trip_id", car.Car.TripId), zap.Error(err))
}
}
}
}

type impersonation struct {
AccountID id.AccountID
}

func (i *impersonation) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
//ImpersonateAccoutHeader = "impersonate-account-id",在auth中定义的
auth.ImpersonateAccoutHeader: i.AccountID.String(),
}, nil
}

// RequireTransportSecurity indicates whether the credentials requires
// transport security.
func (i *impersonation) RequireTransportSecurity() bool {
return false
}

在car/main.go中配置

1
2
3
4
5
6
7
//Start trip updater
tripConn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
logger.Fatal("cannot connection tripConn", zap.Error(err))
}
//carSub 订阅的是“coolcar”
go trip.RunUpdate(carSub, rentalpb.NewTripServiceClient(tripConn), logger)

这样就可以访问了。

6.2防止外网入侵

grpc在映射的时候,除了Authorization映射为Authorization其他的都会在前面加上grpcgateway-。比如Accept就会映射为grpcgateway-Accept。另外如果有Grpc-Metadata-xx前缀,就会把这个前缀去掉。

image-20220510120951144

客户端也可以加这个头部信息。

只要加上Grpc-Metadata-impersonate-account-id就可以黑掉服务器。

防止办法,在gateway服务中修改mux,使用runtime.WithIncomingHeaderMatcher()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
mux := runtime.NewServeMux(runtime.WithMarshalerOption(
runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseEnumNumbers: true,
UseProtoNames: true,
}},
),runtime.WithIncomingHeaderMatcher(func(key string) (string, bool) {
// 敏感信息头部去掉。
if key == runtime.MetadataHeaderPrefix + auth.ImpersonateAccoutHeader{
return "",false
}
return runtime.DefaultHeaderMatcher(key)
}))