2010/07/17

Concurrent Haskell

最近做的東西是 encoding session types in Agda,我這邊目前最終的希望能把 Agda terms 編譯成 Concurrent Haskell 程式。這樣的話,總得要先會寫 Concurrent Haskell 吧!以下是我設想應該要翻譯出來的程式(的加糖版本):

 1: {-# OPTIONS -XScopedTypeVariables #-}
 2: 
 3: import Control.Concurrent
 4:   (forkIO, MVar, newEmptyMVar, putMVar, takeMVar)
 5: import Data.Function
 6: import Unsafe.Coerce
 7: 
 8: -- untyped synchronous channels
 9: 
10: data Chan = Chan (MVar ()) (MVar ())
11: 
12: newChan :: IO Chan
13: newChan =
14:   do d <- newEmptyMVar
15:      a <- newEmptyMVar
16:      putMVar a ()
17:      return (Chan d a)
18: 
19: readChan :: Chan -> IO a
20: readChan (Chan d a) =
21:   do v <- takeMVar (unsafeCoerce d)
22:      putMVar a ()
23:      return v
24: 
25: writeChan :: Chan -> a -> IO ()
26: writeChan (Chan d a) v =
27:   do takeMVar a
28:      putMVar d (unsafeCoerce v)
29: 
30: -- service (channel) provider
31: 
32: data Service = Service (MVar (MVar Chan)) (MVar (MVar Chan))
33: 
34: accept :: Service -> IO Chan
35: accept (Service acc req) =
36:   do m <- newEmptyMVar
37:      putMVar acc m
38:      c <- takeMVar m
39:      return c
40: 
41: request :: Service -> IO Chan
42: request (Service acc req) =
43:   do m <- newEmptyMVar
44:      putMVar req m
45:      c <- takeMVar m
46:      return c
47: 
48: newService :: IO Service
49: newService =
50:   do acc <- newEmptyMVar
51:      req <- newEmptyMVar
52:      forkIO $ fix (\s ->
53:        do m1 <- takeMVar acc
54:           m2 <- takeMVar req
55:           c <- newChan
56:           putMVar m1 c
57:           putMVar m2 c
58:           s)
59:      return (Service acc req)
60: 
61: -- test program
62: 
63: server :: Service -> IO ()
64: server s =
65:   do c1 <- accept s
66:      n :: Integer <- readChan c1
67:      c2 :: Chan <- newChan
68:      writeChan c1 c2
69:      xs :: [Integer] <-
70:        sequence (take (fromInteger n) (repeat (readChan c2)))  -- simplified
71:      writeChan c1 (sum xs)
72:      return ()
73: 
74: client :: Service -> IO Integer
75: client s =
76:   do c1 <- request s
77:      writeChan c1 (2 :: Integer)
78:      c2 :: Chan <- readChan c1
79:      writeChan c2 (512 :: Integer)
80:      writeChan c2 (256 :: Integer)
81:      v :: Integer <- readChan c1
82:      return v
83: 
84: main :: IO ()
85: main =
86:   do s <- newService
87:      forkIO (server s)
88:      v <- client s
89:      print v

不過這是剛好可以動的版本,如果有兩組以上的 server/client 就會當掉。持續實驗中⋯

--
因為在 Agda 已經過了 typecheck,翻譯成 Haskell 理論上就不用堅持 channels 必須是 typed,hence the uses of unsafeCoerce。


去辦台胞證,捷運上想到這樣做其實還弄不出 synchronous channels⋯ 寫的人可能一寫完離開就又進去自己把東西拿走了,但正確行為應該要卡住,等讀的人拿走才離開。


改成這樣似乎可以:

-- untyped synchronous channels

data Chan = Chan (MVar ()) (MVar ())

newChan :: IO Chan
newChan =
  do d <- newEmptyMVar
     a <- newEmptyMVar
     return (Chan d a)

readChan :: Chan -> IO a
readChan (Chan d a) =
  do v <- takeMVar (unsafeCoerce d)
     putMVar a ()
     return v

writeChan :: Chan -> a -> IO ()
writeChan (Chan d a) v = 
  do putMVar d (unsafeCoerce v)
     takeMVar a

另外「main thread 一停、整個程式就停」的行為很討厭,所以我寫了一個 "Join" 等那些分岔出去的 threads 結束。

newtype Join = Join (MVar [MVar ()])

newJoin :: IO Join
newJoin = newMVar [] >>= return . Join

join :: Join -> IO ()
join (Join j) = takeMVar j >>= mapM_ takeMVar >> putMVar j []

forkIO' :: Join -> IO () -> IO ThreadId
forkIO' (Join j) p =
  do m <- newEmptyMVar
     modifyMVar_ j (return . (m:))
     forkIO (p >> putMVar m ())

現在的 main program 就可以寫

main :: IO ()
main =
  do j <- newJoin
     s <- newService
     forkIO' j (server s)
     forkIO' j (client s)
     forkIO' j (client s)
     forkIO' j (client s)
     forkIO' j (server s)
     forkIO' j (server s)
     join j

--
這樣的翻譯應該是忠實的?挺難證的樣子,但不證這類東西感覺上就沒有 contribution 啦!

Labels: