cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    * J4 d! Q: u% n* m( C3 M
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    % d' b: p; b) {# I4 w  P
  3. 9 E8 v) A. \: c( A1 K: B1 q- S
  4. $worker = new Worker();/ S7 R6 H  t7 `8 O' x+ p: C
  5. // 4个进程
    : b" O6 h/ S$ O
  6. $worker->count = 4;
    4 H( i+ c+ t* a* G7 _( [
  7. // 每个进程启动后在当前进程新增一个Worker监听
    " ?3 G/ q" W3 K2 S* v3 J
  8. $worker->onWorkerStart = function($worker)% F, x/ w7 Q# f4 I0 m8 D+ H
  9. {
    ! p8 x( k$ a& X( c* Z* @
  10.     /**
    5 b# [- l/ p/ M3 {
  11.      * 4个进程启动的时候都创建2016端口的Worker
      r+ o7 Q( ]. z& Q- W
  12.      * 当执行到worker->listen()时会报Address already in use错误. E7 @) E1 b( \6 e* i6 k+ d  G
  13.      * 如果worker->count=1则不会报错
    6 i; Z! a# ?  Q$ c( @" `7 N7 p
  14.      */
    : X, i. G0 q! m
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');% D6 m/ A7 n( ?
  16.     $inner_worker->onMessage = 'on_message';/ z/ {7 |6 U- f: F3 W0 n. V1 h
  17.     // 执行监听。这里会报Address already in use错误8 O$ Q6 O5 j5 x0 P% {
  18.     $inner_worker->listen();% F, ]3 K; s* z
  19. };
    / e# D7 M, k2 ]& O
  20. 3 E( x- ?1 p9 y- \# c; ~
  21. $worker->onMessage = 'on_message';
    4 f4 n) x/ B8 E9 p! t
  22. ) I; E$ F7 L# v% |
  23. function on_message($connection, $data)  `6 z9 o! n+ _- V  M, J  u
  24. {
    " D! T# I0 C( |; N; v9 ~
  25.     $connection->send("hello\n");
    $ b: J, s# ]1 T5 I8 Y
  26. }
    6 {0 f1 p) V) y+ ~- g3 [! N

  27. - [2 W1 P+ Z; x. n
  28. // 运行worker
    5 h- D9 X. g" l2 Q
  29. Worker::runAll();& z; _3 H  C  n
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    - y1 i0 J% I: \$ X  l4 h: r

  31. 7 s7 ]. ]1 L. }" o) R+ @
  32. use Workerman\Worker;. b. f% Q% Z; f& r% l  F
  33. require_once './Workerman/Autoloader.php';
    % n- K# s9 M, N' e5 R
  34. % v! p! O. g1 m7 G
  35. $worker = new Worker('text://0.0.0.0:2015');
    7 p& ]9 m! ~4 c* [+ m
  36. // 4个进程
    4 A' Q. a4 l0 p1 E" w
  37. $worker->count = 4;
    4 H' N4 s! J  h. N6 Z1 b) L1 w
  38. // 每个进程启动后在当前进程新增一个Worker监听9 p, g+ o) u8 R  L+ w2 ~
  39. $worker->onWorkerStart = function($worker)
    " R+ b- I; U0 a/ O* U2 d- N
  40. {, H" t5 j6 J" v9 k* M1 h
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');) I  W. `0 O; H% N
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0). K$ J% M$ U" @1 L* r2 Y
  43.     $inner_worker->reusePort = true;
      B( y9 y+ U8 g$ s. T  j- x
  44.     $inner_worker->onMessage = 'on_message';
    # q1 o/ b" T! ^- }* h$ T
  45.     // 执行监听。正常监听不会报错
    % u# {; j* [8 P% l& ~- W8 W
  46.     $inner_worker->listen();
    , O- I9 C/ u+ u6 b" O6 s) k
  47. };8 l4 k1 n! X" f; g1 s7 M9 F2 i- Y

  48. ! Y2 q' {) l: g. ~7 Q) S9 G
  49. $worker->onMessage = 'on_message';
    ; Q  q  u. ~( p  u* r
  50. 7 }- v8 q% p' i3 m# q* E# y% [
  51. function on_message($connection, $data)" b1 k' V! C# l4 |& d8 g# @+ s
  52. {
    & m7 X$ \/ F# e1 h
  53.     $connection->send("hello\n");3 ?( i* p* I* ]% e7 H7 t2 `! E* ^
  54. }
    * i# s9 H/ D+ P9 G
  55. 8 y! c2 R7 ]% o& W
  56. // 运行worker4 }. }8 t/ R- @* w2 a
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: R' r& t, Z' T7 G1 k
  2. use Workerman\Worker;8 C! {9 d; ]+ M, j# M- K
  3. require_once './Workerman/Autoloader.php';4 {5 B, t/ l( D$ R0 ~
  4. // 初始化一个worker容器,监听1234端口6 b1 l3 ^, [# q, F4 Q9 E$ u, |
  5. $worker = new Worker('websocket://0.0.0.0:1234');7 a. X4 N8 ?2 y& ^* }
  6. ! q# `5 m+ X; ~1 s9 d+ o
  7. /*. G+ w+ b. V/ ]5 d$ i
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    + ]2 A: }, }4 y5 s- Z+ k
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)  s: u4 K1 ~/ ]4 d/ V$ D
  10. */
    ' \. m4 p% l5 P& Z* L
  11. $worker->count = 1;
    * C2 F& J( G! V0 S2 j+ ]
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, m; F; u) |  V$ ?% H) w9 C4 H+ s
  13. $worker->onWorkerStart = function($worker)
    . X! R- |$ S1 P
  14. {
      X3 ]4 {, A) ]$ w  S' j1 e* b0 {
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    7 f- x& p' _0 Z! b- O" E
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    6 K' C/ z/ l1 r2 w7 B
  17.     $inner_text_worker->onMessage = function($connection, $buffer)9 d  {8 f  f, H2 E, i! ?
  18.     {
    * E; E& I7 ~) _+ e
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据6 T0 ?8 v0 Y. x$ r: h3 U
  20.         $data = json_decode($buffer, true);
    ( d% p4 t5 w4 _* l
  21.         $uid = $data['uid'];
    + q  k% L9 e4 D; t5 ?% \$ o
  22.         // 通过workerman,向uid的页面推送数据( q: F: G# y) w8 h, R" I; Q* i
  23.         $ret = sendMessageByUid($uid, $buffer);
    3 g/ K' e7 n, ~1 b4 q: F. g
  24.         // 返回推送结果
    ) q) G5 D) E: w# D
  25.         $connection->send($ret ? 'ok' : 'fail');: o8 m* P7 k( E& O/ Q# J" ~! z
  26.     };% f% f0 g9 L7 @
  27.     // ## 执行监听 ##
    + E2 h8 n8 f. U' Q
  28.     $inner_text_worker->listen();
    ' h% z6 @% q- m* \
  29. };+ y# d6 P9 Y" @- x" W# R$ r+ g
  30. // 新增加一个属性,用来保存uid到connection的映射; u% u4 G: P( y8 ^9 G: m  a: |
  31. $worker->uidConnections = array();
    . G8 \$ o5 G/ d
  32. // 当有客户端发来消息时执行的回调函数/ W$ ~+ @- Q& h3 J; b; k
  33. $worker->onMessage = function($connection, $data)! `; b1 h- L# B5 A8 y& u
  34. {& i( K5 u' t& U# m. w
  35.     global $worker;% h' E3 I4 A2 J
  36.     // 判断当前客户端是否已经验证,既是否设置了uid4 W! T# F; o" I, o) e( U; G
  37.     if(!isset($connection->uid))8 B6 k, R0 i  \) ]
  38.     {
    * ^+ t; S( C# k6 }. y; r) B, s3 t/ J
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证). D: n3 D, S1 T3 O. E- e8 |" E( W
  40.        $connection->uid = $data;
    4 z$ l! a& B; B9 `5 P
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; H/ p9 L* B; e0 ]! X
  42.         * 实现针对特定uid推送数据
      R5 ^$ ~. q" `/ J1 X  b' z
  43.         */
    & C: k+ {  b3 @- a- h2 r% q
  44.        $worker->uidConnections[$connection->uid] = $connection;' ^( g5 w! D! Z3 {+ a
  45.        return;
    8 Y6 v: B3 n/ g) j/ j
  46.     }+ C3 j% q" x0 @+ L7 V; z* [1 @
  47. };
    0 V: X! e7 o% V) {, }
  48. * L% c6 [: d: k  x: ~
  49. // 当有客户端连接断开时: {) E+ h7 ]7 H8 n! Y8 K
  50. $worker->onClose = function($connection)" ^) o, o8 g# w2 F! E0 i' G+ }
  51. {0 w0 x) Y& h0 M, `  b+ f' L
  52.     global $worker;
    7 |9 q, y- d6 w3 c4 b$ m
  53.     if(isset($connection->uid))4 ^* X% j# S  V, U( \3 t
  54.     {
    & F9 J9 s! \  D1 B; b: D$ |' ~. J
  55.         // 连接断开时删除映射
    5 _! c- \4 D0 X. P
  56.         unset($worker->uidConnections[$connection->uid]);. ]) P2 J; d- a  B; d; I
  57.     }
    ' u3 A4 ?( x, z: D7 E# l
  58. };
    . j  i5 N' c4 D1 s: i2 v, G/ h

  59.   O& ~" F  P; r4 p( P1 S9 G0 ?
  60. // 向所有验证的用户推送数据/ d8 L' d) C8 i+ E% ^* F
  61. function broadcast($message)
    $ C9 z) h3 Q( V/ n+ Y
  62. {& z1 c5 G+ D6 z1 D9 G9 W5 U( T
  63.    global $worker;
    5 @0 ~# ]3 c4 y
  64.    foreach($worker->uidConnections as $connection), \- q% }0 _5 m/ g$ |" G
  65.    {
    + Q" `5 a' W1 L4 _& S9 J3 U
  66.         $connection->send($message);& N. }6 l: l: u3 s& L5 h# r" c$ e
  67.    }5 m4 [3 d3 {; O" c
  68. }3 T/ ?  Y( T# N! x

  69. 8 p' Y5 X+ _* D" t" ^% A, ^, k
  70. // 针对uid推送数据
    9 i  y3 l1 d: ?' K" ^
  71. function sendMessageByUid($uid, $message)% C8 W3 Z) U$ ~7 M( Y& O
  72. {
    # V& W8 l, z+ @8 C  V# \
  73.     global $worker;
    ) M* @" C6 O8 o7 M
  74.     if(isset($worker->uidConnections[$uid]))6 @2 ]+ E4 B! ]. x4 s. N5 w
  75.     {- s1 o1 u& _! M6 L5 {
  76.         $connection = $worker->uidConnections[$uid];
    1 l- e5 _; ~# t3 |( D& i# ~9 N
  77.         $connection->send($message);2 t$ g- h- p+ c+ ]$ s9 e
  78.         return true;
    % t; d* `5 @8 J+ p( C
  79.     }- J2 r& ]* D5 D7 w  E& x( W7 R+ v
  80.     return false;) h! e, c5 ~( T' H0 ^; x; a9 y
  81. }
    2 F1 p. C7 z1 L' `6 b

  82. & \6 Z! \; W& w1 F$ n7 z  Q
  83. // 运行所有的worker5 B; ^4 F7 m7 j
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ; E+ g, g4 D' o! O4 \2 I8 o
  2. ws.onopen = function(){  J  ^& s7 ~0 V4 p4 E8 v/ Y
  3.     var uid = 'uid1';
      {/ ^/ q7 U. R6 W5 J1 R! h' M
  4.     ws.send(uid);
    2 B( I$ v( ~9 A; }. F! p7 l% @
  5. };: e, S0 D1 d: k% M% n8 Z! Y
  6. ws.onmessage = function(e){
    4 j! P0 d0 h5 _
  7.     alert(e.data);
    1 R: [2 E& w4 L
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    5 X" Y% ^6 ?, y% I8 Y
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 O) `5 ?% C3 m: a( ~- W
  3. // 推送的数据,包含uid字段,表示是给这个uid推送# ?8 m  i- a+ H2 ?' I$ D' i2 v
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    8 [2 X- a7 l( e3 d) [2 ^& h
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符4 i' o+ ~5 }- Z1 |
  6. fwrite($client, json_encode($data)."\n");
    9 A8 P& [& I4 m: \' U6 \
  7. // 读取推送结果
    $ Z2 |% ~. W% j+ G8 n3 W% e5 G
  8. echo fread($client, 8192);
复制代码
6 ]/ x! ~' |9 {( W) y7 i2 W
0 l3 O" W0 R, {$ w





欢迎光临 cncml手绘网 (http://bbs.cncml.com/) Powered by Discuz! X3.2