forkOSを作ったは良いけれど、このままだとスレッド間通信できないでゲソ。 STMをいきなり作りたいところでゲソが、まずはMVarを作らなイカ? とりあえず チケット は切ったでゲソ。
haddockの Control.Concurrent.MVar から読みはじめようと思うでゲッソ。 GHCのソースコードだとMVarはghc/libraries/base/GHC/MVar.hsで定義されているでゲソ。 まずカラッポのMVarを作ってみようじゃなイカ。
-- File: ghc/libraries/base/GHC/MVar.hs
data MVar a = MVar (MVar# RealWorld a)
newEmptyMVar :: IO (MVar a)
newEmptyMVar = IO $ \ s# ->
case newMVar# s# of
(# s2#, svar# #) -> (# s2#, MVar svar# #)
-- File: ghc/libraries/ghc-prim/GHC/Types.hs
newtype IO a = IO (State# RealWorld -> (# State# RealWorld, a #))
このパターンはAjhcの中でも見かけたでゲソ。UIOじゃなイカ。
-- File: ajhc/lib/haskell-extras/Foreign/StablePtr.hs
newStablePtr :: a -> IO (StablePtr a)
newStablePtr x = do
fromUIO $ \w -> case c_newStablePtr (toBang_ x) w of
(# w', s #) -> (# w', fromBang_ s #)
-- File: ajhc/lib/jhc/Jhc/IO.hs
fromUIO :: UIO a -> IO a
fromUIO x = IO (ST x)
-- File: ajhc/lib/jhc-prim/Jhc/Prim/IO.hs
data State_ :: * -> #
data RealWorld :: *
type UST s a = State_ s -> (# State_ s, a #)
type UIO a = UST RealWorld a
newtype ST s a = ST (UST s a)
newtype IO a = IO (ST RealWorld a)
無事MVarが得られたので、なにか入れてみようと思うでゲソ。つまりputMVarを使うでゲソ。
putMVar :: MVar a -> a -> IO ()
putMVar (MVar mvar#) x = IO $ \ s# ->
case putMVar# mvar# x s# of
s2# -> (# s2#, () #)
やはりIOをほどいてプリミティブ関数に内容物をほおりこむでゲソ。
最後にMVarから内容物を取り出してみるでゲソ。
newMVar#プリミティブはどーなっているでゲソ? と、これは単にmvarを初期化しているだかのようでゲソ。
// File: ghc/rts/PrimOps.cmm
stg_newMVarzh
{
/* args: none */
W_ mvar;
ALLOC_PRIM ( SIZEOF_StgMVar, NO_PTRS, stg_newMVarzh );
mvar = Hp - SIZEOF_StgMVar + WDS(1);
SET_HDR(mvar,stg_MVAR_DIRTY_info,CCCS);
// MVARs start dirty: generation 0 has no mutable list
StgMVar_head(mvar) = stg_END_TSO_QUEUE_closure;
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
RET_P(mvar);
}
// File: ghc/rts/StgMiscClosures.cmm
/* ----------------------------------------------------------------------------
END_TSO_QUEUE
This is a static nullary constructor (like []) that we use to mark the
end of a linked TSO queue.
------------------------------------------------------------------------- */
INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","END_TSO_QUEUE")
{ foreign "C" barf("END_TSO_QUEUE object entered!") never returns; }
CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);
またputMVar#というプリミティブも使っていたでゲソ。 さー長いソースコードでゲソ。気合いで読もうじゃなイカー。
// File: ghc/rts/PrimOps.cmm
stg_putMVarzh
{
W_ mvar, val, info, tso, q;
/* args: R1 = MVar, R2 = value */
mvar = R1;
val = R2;
#if defined(THREADED_RTS)
("ptr" info) = foreign "C" lockClosure(mvar "ptr") [];
#else
info = GET_INFO(mvar);
#endif
if (info == stg_MVAR_CLEAN_info) {
foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
}
if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {
// see Note [mvar-heap-check] above
HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR & R2_PTR, stg_putMVarzh);
TICK_ALLOC_PRIM(SIZEOF_StgMVarTSOQueue, 0, 0);
CCCS_ALLOC(SIZEOF_StgMVarTSOQueue);
q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
StgMVarTSOQueue_tso(q) = CurrentTSO;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_head(mvar) = q;
} else {
StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
foreign "C" recordClosureMutated(MyCapability() "ptr",
StgMVar_tail(mvar)) [];
}
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
StgMVar_tail(mvar) = q;
R1 = mvar;
R2 = val;
jump stg_block_putmvar;
}
q = StgMVar_head(mvar);
loop:
if (q == stg_END_TSO_QUEUE_closure) {
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
unlockClosure(mvar, stg_MVAR_DIRTY_info);
jump %ENTRY_CODE(Sp(0));
}
if (StgHeader_info(q) == stg_IND_info ||
StgHeader_info(q) == stg_MSG_NULL_info) {
q = StgInd_indirectee(q);
goto loop;
}
// There are takeMVar(s) waiting: wake up the first one
tso = StgMVarTSOQueue_tso(q);
StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
// actually perform the takeMVar
W_ stack;
stack = StgTSO_stackobj(tso);
PerformTake(stack, val);
// indicate that the MVar operation has now completed.
StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
if (TO_W_(StgStack_dirty(stack)) == 0) {
foreign "C" dirty_STACK(MyCapability() "ptr", stack "ptr") [];
}
foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
unlockClosure(mvar, stg_MVAR_DIRTY_info);
jump %ENTRY_CODE(Sp(0));
}
#define PerformTake(stack, value) \
W_ sp; \
sp = StgStack_sp(stack); \
W_[sp + WDS(1)] = value; \
W_[sp + WDS(0)] = stg_gc_unpt_r1_info;
// File: ghc/includes/rts/storage/SMPClosureOps.h
EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
{
StgWord info;
do {
nat i = 0;
do {
info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
if (info != (W_)&stg_WHITEHOLE_info) return (StgInfoTable *)info;
} while (++i < SPIN_COUNT);
yieldThread();
} while (1);
}
// File: ghc/rts/sm/Storage.c
/*
This is the write barrier for MVARs. An MVAR_CLEAN objects is not
on the mutable list; a MVAR_DIRTY is. When written to, a
MVAR_CLEAN turns into a MVAR_DIRTY and is put on the mutable list.
The check for MVAR_CLEAN is inlined at the call site for speed,
this really does make a difference on concurrency-heavy benchmarks
such as Chaneneos and cheap-concurrency.
*/
void
dirty_MVAR(StgRegTable *reg, StgClosure *p)
{
recordClosureMutated(regTableToCapability(reg),p);
}
// File: ghc/rts/Capability.h
EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
// We must own this Capability in order to modify its mutable list.
// ASSERT(cap->running_task == myTask());
// NO: assertion is violated by performPendingThrowTos()
bd = cap->mut_lists[gen];
if (bd->free >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
new_bd = allocBlock_lock();
new_bd->link = bd;
bd = new_bd;
cap->mut_lists[gen] = bd;
}
*bd->free++ = (StgWord)p;
}
EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
bd = Bdescr((StgPtr)p);
if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
}
// File: ghc/rts/HeapStackCheck.cmm
stg_block_putmvar_finally
{
unlockClosure(R3, stg_MVAR_DIRTY_info);
jump StgReturn;
}
stg_block_putmvar
{
Sp_adj(-3);
Sp(2) = R2;
Sp(1) = R1;
Sp(0) = stg_block_putmvar_info;
R3 = R1;
BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}
// File: ghc/rts/Threads.c
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
traceEventThreadWakeup (cap, tso, tso->cap->no);
#ifdef THREADED_RTS
if (tso->cap != cap)
{
MessageWakeup *msg;
msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
msg->tso = tso;
sendMessage(cap, tso->cap, (Message*)msg);
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
(W_)tso->id, tso->cap->no);
return;
}
#endif
switch (tso->why_blocked)
{
case BlockedOnMVar:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
goto unblock;
} else {
return;
}
}
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
i = lockClosure(tso->block_info.closure);
unlockClosure(tso->block_info.closure, i);
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(W_)tso->id, tso->block_info.throwto->header.info);
return;
}
// remove the block frame from the stack
ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
tso->stackobj->sp += 3;
goto unblock;
}
case BlockedOnBlackHole:
case BlockedOnSTM:
case ThreadMigrating:
goto unblock;
default:
// otherwise, do nothing
return;
}
unblock:
// just run the thread now, if the BH is not really available,
// we'll block again.
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// We used to set the context switch flag here, which would
// trigger a context switch a short time in the future (at the end
// of the current nursery block). The idea is that we have just
// woken up a thread, so we may need to load-balance and migrate
// threads to other CPUs. On the other hand, setting the context
// switch flag here unfairly penalises the current thread by
// yielding its time slice too early.
//
// The synthetic benchmark nofib/smp/chan can be used to show the
// difference quite clearly.
// cap->context_switch = 1;
}
xxx takeMVarについても調べること
xxx リメンバーセットのGCのためにファイナライザの実装が必要
testsuite/tests/concurrent/should_run/conc069.hs at master · ghc/testsuite が動けばいいんじゃなイカ? forkIOはforkOSの別名でとりあえず良しとしたいところでゲソ。 というかforkIOと同一視して良いのであればもっと良いテストがころがっていそうなもんでゲソ。
blog comments powered by