Notes

Do not read codes but patch binary.

xv6 code reading(7.channel編)

今回はchannelについてみてみるよ。

このxv6のcommiterの一人がGo言語の作り手であるRusCoxであるように、channelの概念はGo言語のそれとも関連がある。
先ず、channelなんだけど、これはproc構造体に地味に付いている。

struct proc {
  uint sz;                     // Size of process memory (bytes)
  pde_t* pgdir;                // Page table
  char *kstack;                // Bottom of kernel stack for this process
  enum procstate state;        // Process state
  int pid;                     // Process ID
  struct proc *parent;         // Parent process
  struct trapframe *tf;        // Trap frame for current syscall
  struct context *context;     // swtch() here to run process
  void *chan;                  // If non-zero, sleeping on chan
  int killed;                  // If non-zero, have been killed
  struct file *ofile[NOFILE];  // Open files
  struct inode *cwd;           // Current directory
  char name[16];               // Process name (debugging)
};

このchanだけど、使われる関数としては、xv6内では、sleepとwakeup.
sleepはsyscallにあるけどwakeupはない。Goで例えるなら、send(<-x)とreceive(x<-)か。
使われ方としては、先ず, sleep,wakeupは至る所にあって

1. 普通にsyscallで呼ばれる(channelは&ticks)
2. 子processをwaitする(channelは自proc構造体へのpointer)
3. pipeを作る(channelはpipe構造体のallocateされたbyte数)
4. blockdeviceへのlog書き込み(channelはlog構造体へのpointer)
5. その他spinlock以外の方法でlockを取る全ての処理(channelは様々)

xv6ではlockingの仕組みは2つしか用意しておらず、その1つがsleeplockだ。だからsyscallのsleepより汎用的な概念を持ち、
多用されている。したがって、汎用的な概念となっている理由は2つある引数(lockとchannel)の内、channelの多様性によるものだ。
process内でやっていることは以下の通りで単純。

void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  
  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");

  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }
  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }
}

1. 現在processを取得する
2. lockがprocess tableではなかったら、次にprocessのstateを書き換える為に、process tableにもlockをかける
3. channelをprocessにsetする
4. channelをsleep状態に変更する
5. schedulerをpreemptionする

複数のprocessが同時に同一のchannelでsleepする事もできる。但し、一つのprocessが複数のchannelでsleepする事はできない。

schedulerは常に process tableの中を個々のprocessのstateのみをcheckして回っており、基本的にRunnaleなprocessをRunningする為のものだ。その為、sleepされた時点でschedulerがそのprocessをrunningすることはなくなる。

寝てしまったprocessを起こして(Runnable)にして、schedulerにより、Runningされる対象にする関数がwakeupだ。実態は以下のwakeup1にある。

static void
wakeup1(void *chan)
{
  struct proc *p;
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}

これを見てわかる通り、複数のprocessが同時に同一のchannelでsleepしている場合、wakeupが呼ばれると、必ずしもFIFOキューの様にならない点に注意.
例えば、process2がsleepした後、process1がsleepした場合、schedulerの都合にもよるが、仮に同一channelを用いている場合、wakeupにより、process1が先に起きる事が生じ得る。これを防ぐ為には、channelとなる構造体の中で,process自体をpointerし直すとか、channel自体を順序付けされるものとして表現してやれば良い。pipeが良い例なので、ここでは、pipeを用いて説明する。

pipeは2つのfile descriptor(kernel内ではfile構造体)を引き受け、1つを書き込み専用、1つを読み込み専用としておく。

*f0)->type = FD_PIPE;
  (*f0)->readable = 1;
  (*f0)->writable = 0;
  (*f0)->pipe = p;
  (*f1)->type = FD_PIPE;
  (*f1)->readable = 0;
  (*f1)->writable = 1;
  (*f1)->pipe = p;

またそれとは別にpipe構造体に書き込み用channel,読み込み用channelというものを設ける。channelは書き込まれる分のbyte数分、channelが生成される。

p->nwrite = 0;
p->nread = 0;

channelは、同時に1つしかアクセスを許さない。
新しく書き込む(writeをsleep状態にする)場合は事前に読み込み中(readがsleep状態)かどうかを確認し、終了したら、読み込み可能(readをwakeup)する。
新しく読みこむ場合は、読み込み中(readをsleep)に設定し、終了したら、再度書き込み可能(writeをwakeup)にする必要がある。

なので書き込み処理では、

for(i = 0; i < n; i++){
    while(p->nwrite == p->nread + PIPESIZE){  //DOC: pipewrite-full
      if(p->readopen == 0 || myproc()->killed){
        release(&p->lock);
        return -1;
      }
      wakeup(&p->nread);
      sleep(&p->nwrite, &p->lock);  //DOC: pipewrite-sleep
    }
    p->data[p->nwrite++ % PIPESIZE] = addr[i];
  }
  wakeup(&p->nread);

上の様に、以前、既に読み込みが行われているかどうかをwakeup(&p->nread)により確認し、
sleep(&p->nwrite, &p->lock)によって、byteのoffsetに対応したchannelに書き込みを行う。
dataを書き込んだら、外側のloopに移動し、実際のdataがpipe構造体の保持するbufferに吐き出され、
これをdataがなくなるまで、channelを生成し続ける。

逆にreadする時は、

int
piperead(struct pipe *p, char *addr, int n)
{
  int i;
  acquire(&p->lock);
  while(p->nread == p->nwrite && p->writeopen){  //DOC: pipe-empty
    if(myproc()->killed){
      release(&p->lock);
      return -1;
    }
    sleep(&p->nread, &p->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(p->nread == p->nwrite)
      break;
    addr[i] = p->data[p->nread++ % PIPESIZE];
  }
  wakeup(&p->nwrite);  //DOC: piperead-wakeup
  release(&p->lock);
  return i;
}

読み込もうとしている全データに対して予め別の読み込み処理が来ない様にlockをかけておき、データを読み終わった後に、上書き可能という意味で、書き込み可能を表すwakeupをchannelに対して送る。

とまあpipeに関してはそんな感じにしておいて、最後に、親子process間でのchannelを介したやりとりに関して。

大抵のsample code見ると次のshellの実装みたく、

if(fork1() == 0)
      runcmd(parsecmd(buf)); eventually call exec
    wait();

上記の様なもの見つけるけど、これによると
forkから返ってきた親processはwakeを経由してsleepに到達してる。で、子供の方はexecされて、終了したらexitで返ってくるっていう流れ。

これを詳しく説明すると、
1. 親processは親process自身のchannel上でsleepしてて
2. それを子供がexitしたら起こしにいく(親のstateをrunnableにする)
3. で、子供は子供自身のstateをunusedにせず(できず)、自分をゾンビとしておく
4. 一方、子から起こされた親process(running中)が再びwaitして、zombieの子供を殺し(stateをunusedにし)
5. 親processはwaitからreturnしますとさ

その時に親がchannel内で寝ないで、子供がexitする前に死んじゃうのがzombie.cのcodeなんだけど、

if(fork() > 0)
    sleep(5);  // Let child exit before parent.
  exit();

これでも、exitの方、上手く書かれてて、

// Pass abandoned children to init.
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->parent == curproc){
      p->parent = initproc;
      if(p->state == ZOMBIE)
        wakeup1(initproc);
    }
  }

親がいなかったら、親の祖先を辿っていって、上の方に行くとinitが必ず寝てるので、そいつを起こしてzombieなので殺して
下さいとお願いするんだと。

なるほど上手くできてんね。ただpipeの例とかschedulerの回るoverheadが大きすぎだし、全部userlandでやるというGoの発想は確かに当然。今度Goのruntimeもみてみるか。