您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14817|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    - J+ e7 D; O, O) l, H( w4 z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';& @' W% S7 D7 ?% S; M, \% |1 k
  3. % Y) V- G" I( N! M4 C- i+ o% n
  4. $worker = new Worker();* Y( S! b% Z( T9 l' s
  5. // 4个进程0 }7 k% I. f5 k# E  E
  6. $worker->count = 4;0 R1 _& I% k. W6 b
  7. // 每个进程启动后在当前进程新增一个Worker监听0 A" E  p, C# P  t% K
  8. $worker->onWorkerStart = function($worker)4 b$ G4 [# y8 ^  {
  9. {4 x+ n. ~7 y7 S, A, b# _, k* |
  10.     /**
    ; j0 }& K3 ^1 |. x* y! ^; r
  11.      * 4个进程启动的时候都创建2016端口的Worker
    * f2 S9 |5 h$ G  n: s0 O4 J
  12.      * 当执行到worker->listen()时会报Address already in use错误, G& c/ W5 d- k) v( H/ ?) F
  13.      * 如果worker->count=1则不会报错
    & E: ^1 X7 @6 p( ~1 d
  14.      */1 I6 q0 r1 V! x4 u
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    . k* P( ?1 x1 f+ T* V6 o
  16.     $inner_worker->onMessage = 'on_message';$ {5 F7 `$ |+ z* }* }  u2 g
  17.     // 执行监听。这里会报Address already in use错误: c, L; n/ G9 [7 J' {9 N
  18.     $inner_worker->listen();
    ; b$ }+ l2 N/ f7 r
  19. };
    4 C5 q$ N5 ~5 ~# c

  20. ; m  Q4 w& R  j
  21. $worker->onMessage = 'on_message';
    ( X; F; y9 M' z( r$ c: U5 T- G) C

  22. & P; K* \9 M; j; `3 J
  23. function on_message($connection, $data)
    5 l; C* P1 g+ {- w
  24. {0 q" _! n  r# p- B6 m! F
  25.     $connection->send("hello\n");
    6 Z5 ?" c7 X  n1 f* C9 g% F7 c6 A
  26. }
    4 I# Q: n) E, j7 k
  27. 8 K- m3 v( n9 K- {; `
  28. // 运行worker( k6 I3 R% M# j1 D. g
  29. Worker::runAll();
    2 v9 r* @7 ]( ?* t, o
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    2 Q1 \( X& x# c3 R0 N* _  J

  31. % x- _8 W  k6 A
  32. use Workerman\Worker;% z8 _7 w6 y4 T" D6 y& e
  33. require_once './Workerman/Autoloader.php';* ]' P  O& I5 A, w
  34. 7 |, f& n! ~* s7 E  Q" {: M; C! v# M
  35. $worker = new Worker('text://0.0.0.0:2015');
    4 c# V) R0 @. x
  36. // 4个进程$ z+ a+ b4 |4 _9 ?
  37. $worker->count = 4;
    , r# g0 ?) o2 c% Q# w, W, M
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 ~, `$ }; b: l2 @9 s
  39. $worker->onWorkerStart = function($worker): [0 b/ J/ {8 e9 a7 W$ [
  40. {& G2 W; s& o: J7 ~% g0 F
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');8 Z8 g+ i# a( ~. C* S
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)6 @" c2 C- Y* K% `
  43.     $inner_worker->reusePort = true;
    / ]5 t4 p% K+ ]" y- j) L
  44.     $inner_worker->onMessage = 'on_message';
    3 G6 |; I4 z. L: s( ~
  45.     // 执行监听。正常监听不会报错1 j% P3 U4 k6 v5 l5 s
  46.     $inner_worker->listen();
    ! M/ S$ ]7 o9 x" Z  w: p) z
  47. };
    - J7 y; H$ s" _2 u
  48. 1 H/ _& e1 U9 R1 c. S; w" N" q
  49. $worker->onMessage = 'on_message';& k0 @2 M* [% l: f, F% O4 I

  50. ' C/ L# N! F3 l; n
  51. function on_message($connection, $data); p% Z0 I* T! n
  52. {
    , @0 O" |& H. }2 p6 h
  53.     $connection->send("hello\n");
    6 B- t6 S& `8 u' i! m
  54. }
    % I2 q: N: W/ r& d! _# A! q
  55. % b1 [, J9 P2 S( P
  56. // 运行worker$ L" X7 f8 a! w0 o9 b2 i  y
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    1 p1 N  ?/ m# `# X7 P" H
  2. use Workerman\Worker;6 l7 ^" g/ H6 M- p" E
  3. require_once './Workerman/Autoloader.php';9 m- G( I  f# N: f8 g! }+ w
  4. // 初始化一个worker容器,监听1234端口/ r: D/ n: y9 |: m! s5 \( x
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    , D$ w! j2 P, }
  6. 3 q) i6 X9 ?: d7 L2 ~* D3 J. T; J
  7. /*
    1 r2 E& p! z' o6 i
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误" h& k  b3 s. Q+ S* _
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). p$ j8 M/ i0 f4 k
  10. */
    * D: g$ x0 P* Q" b% S2 S/ C
  11. $worker->count = 1;! T' R/ i; L* v1 t* h
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口/ y, b* S0 `! }  i+ B/ E, n
  13. $worker->onWorkerStart = function($worker)1 I$ b' _. O2 L
  14. {
    2 b& G; D% B' V$ `$ P" s
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符" S' P5 [9 x- K  L
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    1 V* u6 B2 s& i/ E+ O
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    8 t# d" }  q' [/ I  o9 S
  18.     {9 n0 ]& Y4 I  e0 N# A- E4 z
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据' z0 L4 L" r# `) `
  20.         $data = json_decode($buffer, true);0 o3 F: E- J- L
  21.         $uid = $data['uid'];
    . N4 y) }5 Z7 k/ ^) p1 j
  22.         // 通过workerman,向uid的页面推送数据
    " {1 T4 \: @+ H4 P% |0 M! N
  23.         $ret = sendMessageByUid($uid, $buffer);
    * k. u7 |. [2 r6 c* U1 b* I, N
  24.         // 返回推送结果
    4 ?( x' A: `( O7 |/ p* C
  25.         $connection->send($ret ? 'ok' : 'fail');
    6 s4 U8 I. C( C/ T: u& L, X
  26.     };
    / t+ R+ ~# L0 q  R$ s
  27.     // ## 执行监听 ##: T. M( S' s3 g6 S" ?7 e
  28.     $inner_text_worker->listen();
    . e4 k0 U/ {0 j& g. m8 Q2 u
  29. };! S0 V. _: m* \+ U
  30. // 新增加一个属性,用来保存uid到connection的映射
    ! v/ r* B: y( j) `% M, C5 J
  31. $worker->uidConnections = array();( t9 k" j2 k4 a! w# N& d' D& e
  32. // 当有客户端发来消息时执行的回调函数* @/ G' T5 i" [0 v1 o4 v
  33. $worker->onMessage = function($connection, $data)( W9 N6 Q% Z) h5 u
  34. {- o; ]5 i+ h5 V: F2 b
  35.     global $worker;
    . x" b. i% v" d
  36.     // 判断当前客户端是否已经验证,既是否设置了uid" T. G" O1 G" k; ^+ B- d% L
  37.     if(!isset($connection->uid))
    ' u5 y, u! Q- S2 o! [; G, `# j$ x/ ^5 u
  38.     {/ w1 s' f7 Z+ G0 ]5 E5 z' `
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ( O8 w" g( j! {. s
  40.        $connection->uid = $data;9 n$ o" G% S, R! |& c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    0 q; n, l& x/ z4 i7 c, Q: S! p
  42.         * 实现针对特定uid推送数据
    / I  Y# A2 B. c+ K4 C, ~  N, C
  43.         */
    - L6 F, a: E  j$ [0 l
  44.        $worker->uidConnections[$connection->uid] = $connection;
    9 M* s/ [; A4 T# w% y: t: D! I. ]( T+ u
  45.        return;3 j2 i+ o9 @; A2 e2 c+ P
  46.     }* O* _: V7 q2 c- [
  47. };
    $ J' ]7 z' d# i1 J2 U

  48. , B0 Y7 m2 J8 x
  49. // 当有客户端连接断开时3 p5 f' f6 u0 _, @: x
  50. $worker->onClose = function($connection), {6 i$ E2 F# ^9 S
  51. {; E5 Y9 Z, u  K* u) @9 S( R
  52.     global $worker;6 ^1 _* G! x4 x
  53.     if(isset($connection->uid))) i2 }4 f8 j; F' K6 v" j
  54.     {# |3 ]9 Y0 p& ~; J+ Z0 m4 t
  55.         // 连接断开时删除映射
    6 j$ V& W# }$ u% g, q. ~+ ?
  56.         unset($worker->uidConnections[$connection->uid]);  u0 [3 B- g2 ~* r, ?0 Q. l
  57.     }
    ( f+ b- B  T0 `& _9 y9 i6 D, Z% {: W2 a
  58. };
    ! u. }) o0 B% K. T8 d4 K& o
  59. - ?- [7 v2 r! r* q5 U1 O* I
  60. // 向所有验证的用户推送数据. N/ n) N$ Z! O/ N7 |& ?
  61. function broadcast($message)
    1 K- x! |- b# |' w4 S0 X
  62. {
    3 |" o/ C# {4 c" A1 p* g
  63.    global $worker;! J) V3 Y3 [5 _; b% {) Y
  64.    foreach($worker->uidConnections as $connection)( `9 G) g  v5 E2 e$ o3 G
  65.    {% s' H- I, m" r( i( L
  66.         $connection->send($message);" V" M) P* a- Z# X- d& s: c
  67.    }4 n5 O+ R3 O2 c& {
  68. }
    $ n( B: T$ F4 n# s/ S6 f

  69. $ @3 ?; q4 G+ k  }8 v6 I
  70. // 针对uid推送数据
    / G- J' L6 W; s& I  o
  71. function sendMessageByUid($uid, $message)
      q; K; @9 S' l8 n
  72. {
    + b2 R! w$ o5 ^
  73.     global $worker;
    1 \4 T# W% F2 ~' ^- Y/ a
  74.     if(isset($worker->uidConnections[$uid]))
    , a: |& T  @% T
  75.     {
    7 s. @  `) a! ^. o5 f6 K3 s
  76.         $connection = $worker->uidConnections[$uid];
    ; D8 T; \, v) e! U
  77.         $connection->send($message);
    . K! i, ~* c% b, p9 I: ~$ L
  78.         return true;( `* |7 f0 {% s% G- b
  79.     }
    4 F. [6 V9 h) }; N- |; j  d
  80.     return false;' k0 b! }: j* J7 o
  81. }) q& H& R" \% w+ I
  82. ! ~! U% L7 x: \5 x
  83. // 运行所有的worker0 t4 i; b; f( Z4 [; E- X! i. _' s- w
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    - Q6 V0 l. ?& A/ k! R5 I! |
  2. ws.onopen = function(){
    ( P: B, j) H! z, b2 D' {; P
  3.     var uid = 'uid1';
    ' A" m6 ?0 R  [" R  I
  4.     ws.send(uid);' x5 {3 T: g  B/ d
  5. };
    ) A" V" {% {: ?# u
  6. ws.onmessage = function(e){- ]  _& O( q. d" H2 w% z2 |+ T2 B" d7 G
  7.     alert(e.data);
    1 b. W3 A2 m: i* }8 _. T
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口7 `! q6 v* i+ x& b1 [* a
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    2 w! k" o. D0 e( `
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    " X; P, f0 x% C* M
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
      `0 W2 [+ v) w/ U
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    + u5 w( T+ B- l( I) t2 L5 ]
  6. fwrite($client, json_encode($data)."\n");5 u  V: _, o) V! n' N
  7. // 读取推送结果
    2 [2 e& o& t+ _
  8. echo fread($client, 8192);
复制代码
  I6 w$ I/ _% {& I! O
0 `! G8 n3 T% E5 g
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 22:02 , Processed in 0.060932 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!