今回はchannelについてみてみるよ。
このxv6のcommiterの一人がGo言語の作り手であるRusCoxであるように、channelの概念はGo言語のそれとも関連がある。
先ず、channelなんだけど、これはproc構造体に地味に付いている。
struct proc { uint sz; // Size of process memory (bytes) pde_t* pgdir; // Page table char *kstack; // Bottom of kernel stack for this process enum procstate state; // Process state int pid; // Process ID struct proc *parent; // Parent process struct trapframe *tf; // Trap frame for current syscall struct context *context; // swtch() here to run process void *chan; // If non-zero, sleeping on chan int killed; // If non-zero, have been killed struct file *ofile[NOFILE]; // Open files struct inode *cwd; // Current directory char name[16]; // Process name (debugging) };
このchanだけど、使われる関数としては、xv6内では、sleepとwakeup.
sleepはsyscallにあるけどwakeupはない。Goで例えるなら、send(<-x)とreceive(x<-)か。
使われ方としては、先ず, sleep,wakeupは至る所にあって
1. 普通にsyscallで呼ばれる(channelは&ticks)
2. 子processをwaitする(channelは自proc構造体へのpointer)
3. pipeを作る(channelはpipe構造体のallocateされたbyte数)
4. blockdeviceへのlog書き込み(channelはlog構造体へのpointer)
5. その他spinlock以外の方法でlockを取る全ての処理(channelは様々)
xv6ではlockingの仕組みは2つしか用意しておらず、その1つがsleeplockだ。だからsyscallのsleepより汎用的な概念を持ち、
多用されている。したがって、汎用的な概念となっている理由は2つある引数(lockとchannel)の内、channelの多様性によるものだ。
process内でやっていることは以下の通りで単純。
void sleep(void *chan, struct spinlock *lk) { struct proc *p = myproc(); if(p == 0) panic("sleep"); if(lk == 0) panic("sleep without lk"); if(lk != &ptable.lock){ //DOC: sleeplock0 acquire(&ptable.lock); //DOC: sleeplock1 release(lk); } // Go to sleep. p->chan = chan; p->state = SLEEPING; sched(); // Tidy up. p->chan = 0; // Reacquire original lock. if(lk != &ptable.lock){ //DOC: sleeplock2 release(&ptable.lock); acquire(lk); } }
1. 現在processを取得する
2. lockがprocess tableではなかったら、次にprocessのstateを書き換える為に、process tableにもlockをかける
3. channelをprocessにsetする
4. channelをsleep状態に変更する
5. schedulerをpreemptionする
複数のprocessが同時に同一のchannelでsleepする事もできる。但し、一つのprocessが複数のchannelでsleepする事はできない。
schedulerは常に process tableの中を個々のprocessのstateのみをcheckして回っており、基本的にRunnaleなprocessをRunningする為のものだ。その為、sleepされた時点でschedulerがそのprocessをrunningすることはなくなる。
寝てしまったprocessを起こして(Runnable)にして、schedulerにより、Runningされる対象にする関数がwakeupだ。実態は以下のwakeup1にある。
static void wakeup1(void *chan) { struct proc *p; for(p = ptable.proc; p < &ptable.proc[NPROC]; p++) if(p->state == SLEEPING && p->chan == chan) p->state = RUNNABLE; }
これを見てわかる通り、複数のprocessが同時に同一のchannelでsleepしている場合、wakeupが呼ばれると、必ずしもFIFOキューの様にならない点に注意.
例えば、process2がsleepした後、process1がsleepした場合、schedulerの都合にもよるが、仮に同一channelを用いている場合、wakeupにより、process1が先に起きる事が生じ得る。これを防ぐ為には、channelとなる構造体の中で,process自体をpointerし直すとか、channel自体を順序付けされるものとして表現してやれば良い。pipeが良い例なので、ここでは、pipeを用いて説明する。
pipeは2つのfile descriptor(kernel内ではfile構造体)を引き受け、1つを書き込み専用、1つを読み込み専用としておく。
*f0)->type = FD_PIPE; (*f0)->readable = 1; (*f0)->writable = 0; (*f0)->pipe = p; (*f1)->type = FD_PIPE; (*f1)->readable = 0; (*f1)->writable = 1; (*f1)->pipe = p;
またそれとは別にpipe構造体に書き込み用channel,読み込み用channelというものを設ける。channelは書き込まれる分のbyte数分、channelが生成される。
p->nwrite = 0; p->nread = 0;
channelは、同時に1つしかアクセスを許さない。
新しく書き込む(writeをsleep状態にする)場合は事前に読み込み中(readがsleep状態)かどうかを確認し、終了したら、読み込み可能(readをwakeup)する。
新しく読みこむ場合は、読み込み中(readをsleep)に設定し、終了したら、再度書き込み可能(writeをwakeup)にする必要がある。
なので書き込み処理では、
for(i = 0; i < n; i++){ while(p->nwrite == p->nread + PIPESIZE){ //DOC: pipewrite-full if(p->readopen == 0 || myproc()->killed){ release(&p->lock); return -1; } wakeup(&p->nread); sleep(&p->nwrite, &p->lock); //DOC: pipewrite-sleep } p->data[p->nwrite++ % PIPESIZE] = addr[i]; } wakeup(&p->nread);
上の様に、以前、既に読み込みが行われているかどうかをwakeup(&p->nread)により確認し、
sleep(&p->nwrite, &p->lock)によって、byteのoffsetに対応したchannelに書き込みを行う。
dataを書き込んだら、外側のloopに移動し、実際のdataがpipe構造体の保持するbufferに吐き出され、
これをdataがなくなるまで、channelを生成し続ける。
逆にreadする時は、
int piperead(struct pipe *p, char *addr, int n) { int i; acquire(&p->lock); while(p->nread == p->nwrite && p->writeopen){ //DOC: pipe-empty if(myproc()->killed){ release(&p->lock); return -1; } sleep(&p->nread, &p->lock); //DOC: piperead-sleep } for(i = 0; i < n; i++){ //DOC: piperead-copy if(p->nread == p->nwrite) break; addr[i] = p->data[p->nread++ % PIPESIZE]; } wakeup(&p->nwrite); //DOC: piperead-wakeup release(&p->lock); return i; }
読み込もうとしている全データに対して予め別の読み込み処理が来ない様にlockをかけておき、データを読み終わった後に、上書き可能という意味で、書き込み可能を表すwakeupをchannelに対して送る。
とまあpipeに関してはそんな感じにしておいて、最後に、親子process間でのchannelを介したやりとりに関して。
大抵のsample code見ると次のshellの実装みたく、
if(fork1() == 0) runcmd(parsecmd(buf)); eventually call exec wait();
上記の様なもの見つけるけど、これによると
forkから返ってきた親processはwakeを経由してsleepに到達してる。で、子供の方はexecされて、終了したらexitで返ってくるっていう流れ。
これを詳しく説明すると、
1. 親processは親process自身のchannel上でsleepしてて
2. それを子供がexitしたら起こしにいく(親のstateをrunnableにする)
3. で、子供は子供自身のstateをunusedにせず(できず)、自分をゾンビとしておく
4. 一方、子から起こされた親process(running中)が再びwaitして、zombieの子供を殺し(stateをunusedにし)
5. 親processはwaitからreturnしますとさ
その時に親がchannel内で寝ないで、子供がexitする前に死んじゃうのがzombie.cのcodeなんだけど、
if(fork() > 0) sleep(5); // Let child exit before parent. exit();
これでも、exitの方、上手く書かれてて、
// Pass abandoned children to init. for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){ if(p->parent == curproc){ p->parent = initproc; if(p->state == ZOMBIE) wakeup1(initproc); } }
親がいなかったら、親の祖先を辿っていって、上の方に行くとinitが必ず寝てるので、そいつを起こしてzombieなので殺して
下さいとお願いするんだと。
なるほど上手くできてんね。ただpipeの例とかschedulerの回るoverheadが大きすぎだし、全部userlandでやるというGoの発想は確かに当然。今度Goのruntimeもみてみるか。