cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;' W9 V7 V3 ]* _. ?* g! L
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ' q) |1 _" c) V7 ?" c

  3. 5 ^- \% \7 d  L4 u
  4. $worker = new Worker();
      w& |* ?+ J7 ~9 e. Q
  5. // 4个进程1 ~* N+ C; o4 J' x1 t, [+ P
  6. $worker->count = 4;
    1 R" p, M- f2 B$ k' A, l
  7. // 每个进程启动后在当前进程新增一个Worker监听2 K9 Q" d! Q1 d1 N; ?! L/ ~
  8. $worker->onWorkerStart = function($worker)
    ' y* L* ~  @2 Z
  9. {) u& A6 x; J& l5 e; N; I" U
  10.     /**) A  c% Z& e8 J" p3 Y: h' }: T
  11.      * 4个进程启动的时候都创建2016端口的Worker5 s. B/ Y2 `- l- V* a* g6 a
  12.      * 当执行到worker->listen()时会报Address already in use错误- j+ j! U( j" _) x$ T+ Q( g3 Y2 B
  13.      * 如果worker->count=1则不会报错
    5 w. \( h6 F7 K( i. y9 m  n; ^! z& B* i# F
  14.      */
    0 T# U0 U* z4 R% e' h. Q$ ~: l
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 f& [- ^7 r7 P5 O- o4 O7 O
  16.     $inner_worker->onMessage = 'on_message';
    , D) U; {/ k! r/ z) I
  17.     // 执行监听。这里会报Address already in use错误; L; Z: J6 x  [  r* l  _
  18.     $inner_worker->listen();
    7 L5 m' M7 U: {. |
  19. };
    " H& H8 e' z$ I) C$ t1 ^- _
  20. $ a1 Z! O. Q% Y
  21. $worker->onMessage = 'on_message';9 V; F( F! n9 x! F+ w$ P

  22. " E" `- n7 y1 G7 N  M: o, S
  23. function on_message($connection, $data)* X; K9 [& c$ f* `! g. V
  24. {) p% r9 K" k. u# K& I
  25.     $connection->send("hello\n");
    * k* E! _  M* T4 g
  26. }) h* N+ k$ ^. U4 y' I

  27. 5 B. x3 c) q7 J: ]: x$ j
  28. // 运行worker
    + ^8 T' [; N* `; i2 ~
  29. Worker::runAll();- K+ g. F, K: f* e1 e* k
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    1 e: Z" A, r$ v

  31. . L9 Z. J1 V/ P
  32. use Workerman\Worker;+ q( U3 v' ^" {* N2 E$ h( b4 p3 o
  33. require_once './Workerman/Autoloader.php';# ^7 K2 q3 u1 h% j+ e; Z# i2 x
  34. 7 k& V$ t% m( B& m, a+ L
  35. $worker = new Worker('text://0.0.0.0:2015');
    ; T: E9 Q" d% F- s. u( t" q6 T
  36. // 4个进程1 n6 |0 C6 n. X+ m, q4 H% {& S
  37. $worker->count = 4;( V4 k# x  r7 i- g
  38. // 每个进程启动后在当前进程新增一个Worker监听. e, }, v3 i# K% P4 ?
  39. $worker->onWorkerStart = function($worker)& u; W2 M! u% W4 v6 l8 C+ u# a8 }
  40. {' m; b4 d! A4 m2 n$ }/ d7 g
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');7 i6 a( {8 m& w
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    5 F7 I0 W1 W5 T
  43.     $inner_worker->reusePort = true;
    - |, a: R' k" }2 }7 m
  44.     $inner_worker->onMessage = 'on_message';- Z! u/ ^& _  v1 B
  45.     // 执行监听。正常监听不会报错  H& \+ T$ D8 h4 @
  46.     $inner_worker->listen();" u) w8 X4 J; g: Y$ e
  47. };
    & O" ?) m( L( U5 L  b
  48. + o4 b* u: p: L) Y& I
  49. $worker->onMessage = 'on_message';- T% I* y: b: a. o/ `, E
  50. ; {& @6 L' h6 j% F. u2 Y& B) I
  51. function on_message($connection, $data)2 ]5 d& p/ p2 i, p& a  C% u7 R
  52. {% @( _+ g+ v* c
  53.     $connection->send("hello\n");( e0 y9 X: t; _, F
  54. }7 }3 X( e1 n( q4 i) g) x/ Y% E6 M
  55. 3 g) V3 O) q9 \2 p! R7 _
  56. // 运行worker1 c/ O! b  A5 h8 e% T" r
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php6 ~& o* u0 W  I7 |  ~
  2. use Workerman\Worker;
    / \! h" Y& |: M1 m" e
  3. require_once './Workerman/Autoloader.php';& n: l& v; @8 v
  4. // 初始化一个worker容器,监听1234端口
    ; q4 t% F8 {, t/ j) I
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    % @* g  ~/ \1 \2 c' M

  6. ) x. [' p/ I$ F' X! u
  7. /*3 z/ `3 K* W: v
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ' s) ~5 Q" C4 l
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)- ^$ w& z2 p8 j0 j/ F9 y
  10. */* m% D; D4 T$ Q) r3 ]
  11. $worker->count = 1;
    4 f9 p  L6 i" Q( Q: i, {% M
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    , C7 x0 W; Q) ^: C- j7 x# e
  13. $worker->onWorkerStart = function($worker)* `+ F+ G& W6 r
  14. {
    ) K1 J5 L- i7 \5 M* o
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ a5 Y- ^7 B& p1 a# k
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');* B7 w( t+ a; ~0 I
  17.     $inner_text_worker->onMessage = function($connection, $buffer), e/ U$ K5 x7 z9 P! L  Q
  18.     {
    8 S% G( U+ A2 p2 V+ a
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    . l& E% J6 H! j* i+ H8 t0 }
  20.         $data = json_decode($buffer, true);
    ) d. n2 D3 i: J, B7 {
  21.         $uid = $data['uid'];
    + ~1 t/ L' I6 s5 h) j
  22.         // 通过workerman,向uid的页面推送数据
    ) A7 f8 g) z8 P8 K. }" S7 u7 Y
  23.         $ret = sendMessageByUid($uid, $buffer);0 ?' j9 k1 u& H; x2 a6 r; x
  24.         // 返回推送结果
    ' M; n. P! e' Q& b5 s# S
  25.         $connection->send($ret ? 'ok' : 'fail');: L  i" j+ O8 E  J; H. S/ f
  26.     };9 F0 u1 k5 R% W1 A
  27.     // ## 执行监听 ##
    ; u. ]: J$ x% z+ X2 X* O
  28.     $inner_text_worker->listen();# w' F: D0 {) B9 A6 h; }
  29. };2 i+ j3 A1 z/ `& a
  30. // 新增加一个属性,用来保存uid到connection的映射) ^5 j, Q* ?! u
  31. $worker->uidConnections = array();
    : B) P0 C) s) l& \4 L* I) k) c. B
  32. // 当有客户端发来消息时执行的回调函数2 a$ @. z1 Z* }+ j! u% S
  33. $worker->onMessage = function($connection, $data)
    " s1 Y/ @# m- }: d7 k' }
  34. {$ b( }0 X2 D4 f) N$ \
  35.     global $worker;8 E# T3 r* Z) X
  36.     // 判断当前客户端是否已经验证,既是否设置了uid) ~: I" e- d' X5 h. Q/ k! z
  37.     if(!isset($connection->uid))
    ! O1 x; c" g/ r" y" J& v, S! E) w: g7 k
  38.     {9 M+ M# ^" \! H  a3 J# \" b5 q/ a8 T
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
      T6 m' g4 f# ?
  40.        $connection->uid = $data;5 i. w, }* s4 O
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; t% c; m% d, @# x$ S& ]2 o9 i
  42.         * 实现针对特定uid推送数据
    + S0 I* `& O+ n7 Q  {
  43.         */. a5 ]! x0 A* U- h1 `. d( T1 q" V$ l3 @
  44.        $worker->uidConnections[$connection->uid] = $connection;
    2 }8 x, U$ v4 U# x/ V. H. B
  45.        return;
    - s6 B$ p# d7 I4 |$ ^4 Q! M' W
  46.     }. b, _, W$ @/ E' q0 V1 c0 L1 n& M
  47. };
    , S  a, R7 c) j% e  ~! L0 ]; r
  48. 1 |. [: ]! W4 g# W$ E0 j
  49. // 当有客户端连接断开时
    - [$ u" l! P4 K% ~" N
  50. $worker->onClose = function($connection)% q4 W5 r9 I& Y& G
  51. {
    2 }) o* a' r. [7 b% \
  52.     global $worker;6 U& d# z: Q. g4 z8 T1 ]
  53.     if(isset($connection->uid))
    7 a4 a' z4 J2 W$ z7 M) B
  54.     {
    # S: ~% x. d) z
  55.         // 连接断开时删除映射
    " r1 M% J; _- y  H& B4 I" H- T' w
  56.         unset($worker->uidConnections[$connection->uid]);
    - [, j- X% u. u% ^1 ?
  57.     }) G* A9 P: L% i; k, s
  58. };
    . c" f  s- O1 y

  59. ) l: X+ }) u; A8 l; o5 e+ ]
  60. // 向所有验证的用户推送数据- S' c5 a! Y7 m* p: Z: X7 |
  61. function broadcast($message)
    ; ^7 `0 R7 M7 Q. ^6 ^
  62. {
    / z9 I5 U  x1 G
  63.    global $worker;
    # Y+ b7 g3 E: G- A5 G
  64.    foreach($worker->uidConnections as $connection)/ M5 k- r  j3 z" W% i/ K& q5 F
  65.    {
    # H8 Y+ s% b; V# p0 _
  66.         $connection->send($message);5 y3 ^  x8 o' a
  67.    }
    + k8 U* f6 ?) ~7 o+ O" D3 E1 p
  68. }1 j9 n, A0 E; f( Z1 F& p2 g5 m* g% E1 I

  69. . ?" e+ K* ~9 W- y
  70. // 针对uid推送数据. g! Q: U1 N5 m8 }" |
  71. function sendMessageByUid($uid, $message)
    - s  a& T# d1 T0 p9 K3 N
  72. {
    : ?/ ?/ I8 \; p
  73.     global $worker;! @" n+ F# P9 O, i$ z
  74.     if(isset($worker->uidConnections[$uid]))' X- @" ]0 A" C' C3 ~2 O
  75.     {
    & k+ h9 G* p' j  ^$ B4 \
  76.         $connection = $worker->uidConnections[$uid];+ K7 H- a& y% X3 S% ^( x( u1 g
  77.         $connection->send($message);. y4 _7 g! p& x. I
  78.         return true;
    2 T4 t0 D7 D8 U8 r
  79.     }
    ( |9 E5 }* Y* D9 V
  80.     return false;  ^( c/ n% v  m/ f
  81. }
    ! ^, `4 Q# Z, p9 Q
  82. 0 X. k! N" y; x! G
  83. // 运行所有的worker
    2 a$ B( Z+ ^, a9 a, [  ]
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    " v  u$ }: x7 `
  2. ws.onopen = function(){8 r4 X5 s* D, e: K
  3.     var uid = 'uid1';
    6 l' B, ?2 Q" r, g5 i' X
  4.     ws.send(uid);7 }& R" ]8 w  H! F7 w; G- U6 A
  5. };
    1 e! c( x9 G0 v# ?- M, J
  6. ws.onmessage = function(e){7 o' \4 z" P* G* m' w- X, a( l
  7.     alert(e.data);
    & S6 Q8 \5 t( s+ u$ q* F
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    + h6 x- {" U8 A3 h2 K9 i9 n
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    7 T( E, S% m- V" S6 \" D* L  _
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    " [% O8 d- _4 A, C8 C
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    6 Q) N. j& p& N& m
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符1 y9 G  b" F1 I; V. d) ^& D1 l6 z
  6. fwrite($client, json_encode($data)."\n");! U% p% r+ P7 D0 U; m) G% I
  7. // 读取推送结果
    0 j8 Q) f* e" V4 M& Y" W' Z
  8. echo fread($client, 8192);
复制代码

/ u9 Z3 |" E% r" g8 J5 W
. H8 p' Y7 Q3 U% b; Q




欢迎光临 cncml手绘网 (http://bbs.cncml.com/) Powered by Discuz! X3.2