|
@@ -48,12 +48,14 @@ type Control struct {
|
|
|
// synchronizer for controlled shutdown of writer()
|
|
|
writerShutdown *util.Shutdown
|
|
|
|
|
|
+ // synchronizer for controlled shutdown of reader()
|
|
|
+ readerShutdown *util.Shutdown
|
|
|
+
|
|
|
// synchronizer for controlled shutdown of manager()
|
|
|
managerShutdown *util.Shutdown
|
|
|
|
|
|
// synchronizer for controller shutdown of entire Control
|
|
|
shutdown *util.Shutdown
|
|
|
-
|
|
|
}
|
|
|
|
|
|
func NewControl(ctlConn conn.Conn, authMsg *msg.Auth) {
|
|
@@ -61,15 +63,16 @@ func NewControl(ctlConn conn.Conn, authMsg *msg.Auth) {
|
|
|
|
|
|
// create the object
|
|
|
c := &Control{
|
|
|
- auth: authMsg,
|
|
|
- conn: ctlConn,
|
|
|
- out: make(chan msg.Message),
|
|
|
- in: make(chan msg.Message),
|
|
|
- proxies: make(chan conn.Conn, 10),
|
|
|
- lastPing: time.Now(),
|
|
|
- writerShutdown: util.NewShutdown(),
|
|
|
+ auth: authMsg,
|
|
|
+ conn: ctlConn,
|
|
|
+ out: make(chan msg.Message),
|
|
|
+ in: make(chan msg.Message),
|
|
|
+ proxies: make(chan conn.Conn, 10),
|
|
|
+ lastPing: time.Now(),
|
|
|
+ writerShutdown: util.NewShutdown(),
|
|
|
+ readerShutdown: util.NewShutdown(),
|
|
|
managerShutdown: util.NewShutdown(),
|
|
|
- shutdown: util.NewShutdown(),
|
|
|
+ shutdown: util.NewShutdown(),
|
|
|
}
|
|
|
|
|
|
failAuth := func(e error) {
|
|
@@ -177,7 +180,9 @@ func (c *Control) manager() {
|
|
|
|
|
|
case mRaw, ok := <-c.in:
|
|
|
// c.in closes to indicate shutdown
|
|
|
- if !ok { return }
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
switch m := mRaw.(type) {
|
|
|
case *msg.ReqTunnel:
|
|
@@ -216,13 +221,15 @@ func (c *Control) writer() {
|
|
|
func (c *Control) reader() {
|
|
|
defer func() {
|
|
|
if err := recover(); err != nil {
|
|
|
- c.conn.Info("Control::reader failed with error %v: %s", err, debug.Stack())
|
|
|
+ c.conn.Warn("Control::reader failed with error %v: %s", err, debug.Stack())
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
// kill everything if the reader stops
|
|
|
defer c.shutdown.Begin()
|
|
|
|
|
|
+ // notify that we're done
|
|
|
+ defer c.readerShutdown.Complete()
|
|
|
|
|
|
// read messages from the control channel
|
|
|
for {
|
|
@@ -253,6 +260,10 @@ func (c *Control) stopper() {
|
|
|
// remove ourself from the control registry
|
|
|
controlRegistry.Del(c.id)
|
|
|
|
|
|
+ // close the connection's read side so that reader() stops
|
|
|
+ c.conn.CloseRead()
|
|
|
+ c.readerShutdown.WaitComplete()
|
|
|
+
|
|
|
// shutdown manager() so that we have no more work to do
|
|
|
close(c.in)
|
|
|
c.managerShutdown.WaitComplete()
|
|
@@ -261,8 +272,7 @@ func (c *Control) stopper() {
|
|
|
close(c.out)
|
|
|
c.writerShutdown.WaitComplete()
|
|
|
|
|
|
- // close the connection
|
|
|
- // XXX: this will kill reader() ungracefully
|
|
|
+ // close connection fully
|
|
|
c.conn.Close()
|
|
|
|
|
|
// shutdown all of the tunnels
|