您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15232|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    * v  Q$ r: b4 E* m# @
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    * k5 `5 S5 r& F1 m5 W

  3. ) g, Q3 i4 c0 L4 k( ?" u
  4. $worker = new Worker();
    ' c# T2 l. L5 b! T, B- f& {4 W! P+ h
  5. // 4个进程. C& Y5 X0 y. Z+ H, p7 r( s1 o
  6. $worker->count = 4;
    5 D7 n; V, l4 `& f( z9 C( D
  7. // 每个进程启动后在当前进程新增一个Worker监听* m; g) e9 z: N( p+ z' m, w
  8. $worker->onWorkerStart = function($worker)
    7 [. R0 K1 K( E; l% y/ I  K% Y6 R
  9. {* \1 m- n6 o0 r6 H6 P1 I6 W
  10.     /**' f$ I$ e3 e' D* b) b" k$ E
  11.      * 4个进程启动的时候都创建2016端口的Worker
    : l7 z( \7 A" W3 C
  12.      * 当执行到worker->listen()时会报Address already in use错误5 e* z' d$ `6 ?$ j3 F1 M
  13.      * 如果worker->count=1则不会报错, A6 `7 S/ S& W; Y% j
  14.      */
    8 u2 O; I, ]3 x, D: P: g, x
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');& o5 B0 m2 {3 N
  16.     $inner_worker->onMessage = 'on_message';
    9 k; [* F: W9 d$ G9 u% p
  17.     // 执行监听。这里会报Address already in use错误- N* C& A" @4 ]' z* \1 x
  18.     $inner_worker->listen();
    9 ?1 B9 j1 m( @9 W, J4 {
  19. };
    2 Q" a9 Z! b$ [: T
  20. 9 e9 ?1 i) O, h3 y. d
  21. $worker->onMessage = 'on_message';
    " a8 I" b7 Z0 b& W: _
  22. 3 _* A4 ?$ N: ^# B- f
  23. function on_message($connection, $data)
    : q5 k0 E4 k' k" K
  24. {
    ; n4 q  S3 G! h  p1 M6 x! b
  25.     $connection->send("hello\n");2 ?- E9 H7 s2 w& I# ~$ U; j
  26. }
    $ w- g- ^" ^7 G  S( Y
  27. ' i' I+ y. }; B7 f
  28. // 运行worker- T; }) c; ?% N4 k1 M6 e
  29. Worker::runAll();$ p5 B- E/ S. U0 `: [4 p
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:) |4 k" w: ^) [# j; J) X

  31. 3 B% Q2 s! b  T  X3 C& U) Y
  32. use Workerman\Worker;
    / U* n, Y0 ^1 U
  33. require_once './Workerman/Autoloader.php';+ ]% ?4 S( S- A$ T  k: U& }; |

  34. 5 ?- E, b, Z' L2 t$ H5 y
  35. $worker = new Worker('text://0.0.0.0:2015');) Y. K' K3 H( [9 L/ d, h
  36. // 4个进程' [3 O* }, x- Z; k
  37. $worker->count = 4;/ ^9 E" R: W% V) X: b+ d
  38. // 每个进程启动后在当前进程新增一个Worker监听( E; w/ m0 `8 X7 q2 {$ a" S
  39. $worker->onWorkerStart = function($worker)* Q- m$ J2 s1 T$ U# C
  40. {
    8 ^& P* s8 Z, K) D# N2 Q( |
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');! e' R5 ~! x6 L/ w- ~8 Q
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    * J* k" j. F$ z$ ]: b' k
  43.     $inner_worker->reusePort = true;
    ( s/ O) G1 X# e) P
  44.     $inner_worker->onMessage = 'on_message';6 X' o  \5 k1 Z
  45.     // 执行监听。正常监听不会报错5 w, \, \) {5 [' _( ~! i& C
  46.     $inner_worker->listen();/ U! n' K. c% _
  47. };
    + I$ J; d  h% i& G
  48. 9 z. m% r% P* n  d: a) W. o# }
  49. $worker->onMessage = 'on_message';
    ; a4 W* Z9 Q6 D- _+ i$ f" f

  50. : _6 X3 C0 Q3 f* I" v
  51. function on_message($connection, $data)
    1 w0 n) o. ^# s* n1 o* H4 p
  52. {
    / e& C. a, i3 p% _. |3 Q) |, n
  53.     $connection->send("hello\n");* J1 ~) U  O! i; r
  54. }2 [& v- z+ ^8 Q; d+ u, E

  55. 2 \! l7 Q! _" U/ w3 ?
  56. // 运行worker0 k7 g  q" j6 W- d3 T6 P
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    3 X: ?" z( N6 y6 C: O. C
  2. use Workerman\Worker;; n7 u: Q- c6 B% p7 g$ h+ q# l, L$ _
  3. require_once './Workerman/Autoloader.php';2 I/ G; v2 W3 `: e5 i8 t/ M, z& M  V
  4. // 初始化一个worker容器,监听1234端口
    5 _. X7 i, C) C% r
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    2 Z* d" P/ M- e# k+ Z5 C' W

  6. 3 U% k7 ]7 E. |: c% J0 }+ @; o  r
  7. /*8 b/ E' i3 B" F+ r5 U
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误  _, ^) M6 s. S! w6 j
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)4 m+ D1 @9 l1 o
  10. */
    ; b. R* n' u& p
  11. $worker->count = 1;
    , r9 g# B' f6 S6 y, J
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    9 y- S. d( d6 n7 j% W, @
  13. $worker->onWorkerStart = function($worker)
    ! ~' B' q  V  \
  14. {
    / \1 g2 Q3 r1 x* [: I# J
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    3 A" o0 c# D: ^6 `9 T
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    - s) k& o7 W, J- T4 |9 N; @4 N
  17.     $inner_text_worker->onMessage = function($connection, $buffer)) o/ u/ ?$ s' q, ~: V
  18.     {8 w3 |& u, `: G3 [' u0 ], f
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    1 W0 x, [" z  I. [% \7 f
  20.         $data = json_decode($buffer, true);3 V1 t9 {/ i. }
  21.         $uid = $data['uid'];
    * m( e9 N1 s  ?+ g" B; a( x
  22.         // 通过workerman,向uid的页面推送数据9 r9 V  S8 r6 z$ r# t0 J  s
  23.         $ret = sendMessageByUid($uid, $buffer);& C% `" ?  `+ h4 C8 ^; D
  24.         // 返回推送结果
    % K" p2 p8 G8 P  ?
  25.         $connection->send($ret ? 'ok' : 'fail');1 F" W: f- `" o; C
  26.     };1 J( Y2 D9 j6 P" S4 G# B( t
  27.     // ## 执行监听 ##
    ( P, x5 i3 t1 e7 P  T$ S, m1 k
  28.     $inner_text_worker->listen();
    / w( M* W9 L) S- e
  29. };
    ) l! E- p* X& _! s: h
  30. // 新增加一个属性,用来保存uid到connection的映射
    8 G+ H) r( l+ N. \& l" x. o
  31. $worker->uidConnections = array();% F0 M0 K! z- p9 Y: \
  32. // 当有客户端发来消息时执行的回调函数
    / l% ]7 _) k# R* r
  33. $worker->onMessage = function($connection, $data)
    * o7 k; [# l5 L& i+ [5 G
  34. {
    3 i( P4 C& M& b( M! M7 y
  35.     global $worker;1 r" ?% x/ x- u, [/ a
  36.     // 判断当前客户端是否已经验证,既是否设置了uid+ ]  I3 \% \5 D' _
  37.     if(!isset($connection->uid)); D' u7 @: X& p  B
  38.     {3 t3 Q( Z- V4 H* I8 D' a$ Y1 T
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)6 ~( m/ M1 ?0 G6 y/ u
  40.        $connection->uid = $data;4 ]& ]8 {4 `6 D+ |/ p
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,* A" \, I& e- ~7 v
  42.         * 实现针对特定uid推送数据. \+ z1 d/ V: A: [
  43.         */$ L2 {0 x' [2 Q3 ?! }" Z7 ]
  44.        $worker->uidConnections[$connection->uid] = $connection;
    5 n9 I! |2 P" \& Q- }2 t& e# i, G
  45.        return;: v1 x$ {6 y3 k! A) f6 L
  46.     }
    8 v) z; N7 S# B
  47. };
    5 K& U7 W9 `  n! ~! z

  48. # ?, X* R4 f/ t9 P+ F
  49. // 当有客户端连接断开时
    - r: b2 \0 v# w2 w5 d! D) }+ k, V! Q
  50. $worker->onClose = function($connection)
    - [6 c8 j2 X8 s# F
  51. {, G- P5 R2 r2 m) `3 o+ y% V2 C+ B" _
  52.     global $worker;
    5 ?6 {8 [% L  `/ b
  53.     if(isset($connection->uid))
      ?  }: i+ ?3 R# J
  54.     {7 P% J$ G5 H) I- a* b6 e" A
  55.         // 连接断开时删除映射0 ?+ ]! ~) B' _4 H+ t: _
  56.         unset($worker->uidConnections[$connection->uid]);
    ! \( X1 r, l1 Y5 G
  57.     }
    # s- ^. K) N: c# {
  58. };
    2 r" i4 Q" Y* M* j" o
  59.   ^: f, ?$ n1 V) \& r! d. O
  60. // 向所有验证的用户推送数据$ ^- L8 M6 K4 ]3 T; ?* t1 T+ a2 {
  61. function broadcast($message)1 p9 C+ k6 p! p3 R+ r2 _6 T- H
  62. {
    $ i1 i/ v4 K& R
  63.    global $worker;& o7 e* t- {3 w
  64.    foreach($worker->uidConnections as $connection)
    $ b& ^* ^- a9 R% m7 M
  65.    {6 \1 Q7 E% Y* K
  66.         $connection->send($message);8 t2 K2 C7 d! {  l* b- U: T
  67.    }- g% R/ M: V, d: L! o. j: w
  68. }
      n: J4 N; o7 H, U' P7 D* @

  69. 3 {0 \: P5 n0 ^. P4 v2 h
  70. // 针对uid推送数据% b, y8 {! w& K- Q0 P3 d/ V/ t9 k5 a9 g
  71. function sendMessageByUid($uid, $message)
    8 j  n! [9 [" Q. a
  72. {- x' v: u9 I2 q7 a+ s/ v) X- {
  73.     global $worker;
    # Q, @7 _. U; ~1 G9 Z7 t- f
  74.     if(isset($worker->uidConnections[$uid]))
    5 G. n+ H  d8 s; s( d
  75.     {
    # v) \) F7 D4 O; i
  76.         $connection = $worker->uidConnections[$uid];
    3 U% b# c8 y) [$ D
  77.         $connection->send($message);
    ) D# i" T/ X$ A
  78.         return true;$ T* r3 B9 c/ r* n2 }2 l
  79.     }$ U6 }% h5 M' z& @! n6 I! j1 l# c
  80.     return false;& y8 o; a" [4 _
  81. }
    ; s2 x  v- l  I
  82. 0 `; J  @- ?1 R' k. S! {8 f
  83. // 运行所有的worker7 K6 ?# I1 D3 L2 `
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');% z+ J, M. T7 u- P' K
  2. ws.onopen = function(){
    8 s' s# A5 z# r! P" v' d- Z
  3.     var uid = 'uid1';/ l/ ?0 C# ]3 Y# X6 |4 g! A7 \
  4.     ws.send(uid);
    4 K3 D: Q, I' h7 _% U( {5 ~
  5. };
    2 e4 ^5 E* @5 I- E! W2 g* h  r
  6. ws.onmessage = function(e){$ w( d. G2 X; n. y- g$ C- O* Q
  7.     alert(e.data);& Y8 a1 O4 l; o7 y% j
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    " K$ d! v4 I- Y" y4 ~' U
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    - p% ?  a( E- J: T" H# `
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    % y! _& }4 V( b2 O0 ]* b
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    8 q4 B6 T/ h2 k( \4 Y3 _9 v) c5 e
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符2 R* h( @; l' G; M1 H
  6. fwrite($client, json_encode($data)."\n");8 \. a" f# j. U8 Q! a. b
  7. // 读取推送结果
    - R5 D+ F! i' ~/ @: \& q! P) ?4 ]5 @; f
  8. echo fread($client, 8192);
复制代码
+ a- Z- e* h( Y* C" P- G" R  q
% R  G% x" p% f5 S' }' _
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-20 05:40 , Processed in 0.078983 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!