(作成中) Implement MVar.

Posted on June 21, 2013 / Tags: jhc, ajhc, thread, mvar

Table of contents


forkOSを作ったは良いけれど、このままだとスレッド間通信できないでゲソ。 STMをいきなり作りたいところでゲソが、まずはMVarを作らなイカ? とりあえず チケット は切ったでゲソ。

GHCの実装を解析するでゲソ

Haskell層

haddockの Control.Concurrent.MVar から読みはじめようと思うでゲッソ。 GHCのソースコードだとMVarはghc/libraries/base/GHC/MVar.hsで定義されているでゲソ。 まずカラッポのMVarを作ってみようじゃなイカ。

-- File: ghc/libraries/base/GHC/MVar.hs
data MVar a = MVar (MVar# RealWorld a)

newEmptyMVar  :: IO (MVar a)
newEmptyMVar = IO $ \ s# ->
    case newMVar# s# of
         (# s2#, svar# #) -> (# s2#, MVar svar# #)

-- File: ghc/libraries/ghc-prim/GHC/Types.hs
newtype IO a = IO (State# RealWorld -> (# State# RealWorld, a #))

このパターンはAjhcの中でも見かけたでゲソ。UIOじゃなイカ。

-- File: ajhc/lib/haskell-extras/Foreign/StablePtr.hs
newStablePtr :: a -> IO (StablePtr a)
newStablePtr x = do
    fromUIO $ \w -> case c_newStablePtr (toBang_ x) w of
        (# w', s #) -> (# w', fromBang_ s #)

-- File: ajhc/lib/jhc/Jhc/IO.hs
fromUIO :: UIO a -> IO a
fromUIO x = IO (ST x)

-- File: ajhc/lib/jhc-prim/Jhc/Prim/IO.hs
data State_ :: * -> #
data RealWorld :: *
type UST s a = State_ s -> (# State_ s, a #)
type UIO a = UST RealWorld a
newtype ST s a = ST (UST s a)
newtype IO a = IO (ST RealWorld a)

無事MVarが得られたので、なにか入れてみようと思うでゲソ。つまりputMVarを使うでゲソ。

putMVar  :: MVar a -> a -> IO ()
putMVar (MVar mvar#) x = IO $ \ s# ->
    case putMVar# mvar# x s# of
        s2# -> (# s2#, () #)

やはりIOをほどいてプリミティブ関数に内容物をほおりこむでゲソ。

最後にMVarから内容物を取り出してみるでゲソ。

takeMVar :: MVar a -> IO a
takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#

cmm層

newMVar#プリミティブはどーなっているでゲソ? と、これは単にmvarを初期化しているだかのようでゲソ。

// File: ghc/rts/PrimOps.cmm
stg_newMVarzh
{
    /* args: none */
    W_ mvar;

    ALLOC_PRIM ( SIZEOF_StgMVar, NO_PTRS, stg_newMVarzh );
  
    mvar = Hp - SIZEOF_StgMVar + WDS(1);
    SET_HDR(mvar,stg_MVAR_DIRTY_info,CCCS);
        // MVARs start dirty: generation 0 has no mutable list
    StgMVar_head(mvar)  = stg_END_TSO_QUEUE_closure;
    StgMVar_tail(mvar)  = stg_END_TSO_QUEUE_closure;
    StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
    RET_P(mvar);
}

// File: ghc/rts/StgMiscClosures.cmm
/* ----------------------------------------------------------------------------
   END_TSO_QUEUE

   This is a static nullary constructor (like []) that we use to mark the
   end of a linked TSO queue.
   ------------------------------------------------------------------------- */

INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","END_TSO_QUEUE")
{ foreign "C" barf("END_TSO_QUEUE object entered!") never returns; }

CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);

またputMVar#というプリミティブも使っていたでゲソ。 さー長いソースコードでゲソ。気合いで読もうじゃなイカー。

// File: ghc/rts/PrimOps.cmm
stg_putMVarzh
{
    W_ mvar, val, info, tso, q;

    /* args: R1 = MVar, R2 = value */
    mvar = R1;
    val  = R2;

#if defined(THREADED_RTS)
    ("ptr" info) = foreign "C" lockClosure(mvar "ptr") [];
#else
    info = GET_INFO(mvar);
#endif

    if (info == stg_MVAR_CLEAN_info) {
        foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
    }

    if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {

        // see Note [mvar-heap-check] above
        HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR & R2_PTR, stg_putMVarzh);
        TICK_ALLOC_PRIM(SIZEOF_StgMVarTSOQueue, 0, 0);
        CCCS_ALLOC(SIZEOF_StgMVarTSOQueue);

        q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);

        SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
        StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
        StgMVarTSOQueue_tso(q)  = CurrentTSO;

	if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
	    StgMVar_head(mvar) = q;
	} else {
            StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
            foreign "C" recordClosureMutated(MyCapability() "ptr",
                                             StgMVar_tail(mvar)) [];
	}
	StgTSO__link(CurrentTSO)       = q;
	StgTSO_block_info(CurrentTSO)  = mvar;
	StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
	StgMVar_tail(mvar)             = q;

        R1 = mvar;
        R2 = val;
	jump stg_block_putmvar;
    }
  
    q = StgMVar_head(mvar);
loop:
    if (q == stg_END_TSO_QUEUE_closure) {
	/* No further takes, the MVar is now full. */
	StgMVar_value(mvar) = val;
    	unlockClosure(mvar, stg_MVAR_DIRTY_info);
	jump %ENTRY_CODE(Sp(0));
    }
    if (StgHeader_info(q) == stg_IND_info ||
        StgHeader_info(q) == stg_MSG_NULL_info) {
        q = StgInd_indirectee(q);
        goto loop;
    }

    // There are takeMVar(s) waiting: wake up the first one
    
    tso = StgMVarTSOQueue_tso(q);
    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
    if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
    }

    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
    ASSERT(StgTSO_block_info(tso) == mvar);

    // actually perform the takeMVar
    W_ stack;
    stack = StgTSO_stackobj(tso);
    PerformTake(stack, val);

    // indicate that the MVar operation has now completed.
    StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;

    if (TO_W_(StgStack_dirty(stack)) == 0) {
        foreign "C" dirty_STACK(MyCapability() "ptr", stack "ptr") [];
    }
    
    foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];

    unlockClosure(mvar, stg_MVAR_DIRTY_info);
    jump %ENTRY_CODE(Sp(0));
}

#define PerformTake(stack, value)               \
    W_ sp;                                      \
    sp = StgStack_sp(stack);                    \
    W_[sp + WDS(1)] = value;                    \
    W_[sp + WDS(0)] = stg_gc_unpt_r1_info;

// File: ghc/includes/rts/storage/SMPClosureOps.h
EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
{
    StgWord info;
    do {
	nat i = 0;
	do {
	    info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
	    if (info != (W_)&stg_WHITEHOLE_info) return (StgInfoTable *)info;
	} while (++i < SPIN_COUNT);
	yieldThread();
    } while (1);
}

// File: ghc/rts/sm/Storage.c
/*
   This is the write barrier for MVARs.  An MVAR_CLEAN objects is not
   on the mutable list; a MVAR_DIRTY is.  When written to, a
   MVAR_CLEAN turns into a MVAR_DIRTY and is put on the mutable list.
   The check for MVAR_CLEAN is inlined at the call site for speed,
   this really does make a difference on concurrency-heavy benchmarks
   such as Chaneneos and cheap-concurrency.
*/
void
dirty_MVAR(StgRegTable *reg, StgClosure *p)
{
    recordClosureMutated(regTableToCapability(reg),p);
}

// File: ghc/rts/Capability.h
EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
    bdescr *bd;

    // We must own this Capability in order to modify its mutable list.
    //    ASSERT(cap->running_task == myTask());
    // NO: assertion is violated by performPendingThrowTos()
    bd = cap->mut_lists[gen];
    if (bd->free >= bd->start + BLOCK_SIZE_W) {
	bdescr *new_bd;
	new_bd = allocBlock_lock();
	new_bd->link = bd;
	bd = new_bd;
	cap->mut_lists[gen] = bd;
    }
    *bd->free++ = (StgWord)p;
}

EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
    bdescr *bd;
    bd = Bdescr((StgPtr)p);
    if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
}

// File: ghc/rts/HeapStackCheck.cmm
stg_block_putmvar_finally
{
    unlockClosure(R3, stg_MVAR_DIRTY_info);
    jump StgReturn;
}

stg_block_putmvar
{
    Sp_adj(-3);
    Sp(2) = R2;
    Sp(1) = R1;
    Sp(0) = stg_block_putmvar_info;
    R3 = R1;
    BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}

// File: ghc/rts/Threads.c
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
    traceEventThreadWakeup (cap, tso, tso->cap->no);

#ifdef THREADED_RTS
    if (tso->cap != cap)
    {
        MessageWakeup *msg;
        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
        msg->tso = tso;
        sendMessage(cap, tso->cap, (Message*)msg);
        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
                      (W_)tso->id, tso->cap->no);
        return;
    }
#endif

    switch (tso->why_blocked)
    {
    case BlockedOnMVar:
    {
        if (tso->_link == END_TSO_QUEUE) {
            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
            goto unblock;
        } else {
            return;
        }
    }

    case BlockedOnMsgThrowTo:
    {
        const StgInfoTable *i;
        
        i = lockClosure(tso->block_info.closure);
        unlockClosure(tso->block_info.closure, i);
        if (i != &stg_MSG_NULL_info) {
            debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                          (W_)tso->id, tso->block_info.throwto->header.info);
            return;
        }

        // remove the block frame from the stack
        ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
        tso->stackobj->sp += 3;
        goto unblock;
    }

    case BlockedOnBlackHole:
    case BlockedOnSTM:
    case ThreadMigrating:
        goto unblock;

    default:
        // otherwise, do nothing
        return;
    }

unblock:
    // just run the thread now, if the BH is not really available,
    // we'll block again.
    tso->why_blocked = NotBlocked;
    appendToRunQueue(cap,tso);

    // We used to set the context switch flag here, which would
    // trigger a context switch a short time in the future (at the end
    // of the current nursery block).  The idea is that we have just
    // woken up a thread, so we may need to load-balance and migrate
    // threads to other CPUs.  On the other hand, setting the context
    // switch flag here unfairly penalises the current thread by
    // yielding its time slice too early.
    //
    // The synthetic benchmark nofib/smp/chan can be used to show the
    // difference quite clearly.

    // cap->context_switch = 1;
}

xxx takeMVarについても調べること

作戦

xxx リメンバーセットのGCのためにファイナライザの実装が必要

実装

テスト

testsuite/tests/concurrent/should_run/conc069.hs at master · ghc/testsuite が動けばいいんじゃなイカ? forkIOはforkOSの別名でとりあえず良しとしたいところでゲソ。 というかforkIOと同一視して良いのであればもっと良いテストがころがっていそうなもんでゲソ。

blog comments powered by Disqus