您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15077|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    6 a1 v( r* @. r0 }1 u% v
  2. require_once __DIR__ . '/Workerman/Autoloader.php';9 G0 d$ B% ?1 d- ]5 Z/ C' H- c& d

  3. , ^, K0 a, g' X8 J
  4. $worker = new Worker();1 U3 }$ f& L8 ^: g! q
  5. // 4个进程& t& |) Z& I. {2 M2 H! h/ d6 p
  6. $worker->count = 4;" ~# m- x  E: Q2 a
  7. // 每个进程启动后在当前进程新增一个Worker监听: I  M9 B3 m; M( {# e# E. f
  8. $worker->onWorkerStart = function($worker), [4 K1 ]. }4 ]7 s3 J
  9. {, N0 K: A# J3 u& Q4 c, ^/ q  d
  10.     /**
    3 C/ o& _6 U8 x6 U. ^
  11.      * 4个进程启动的时候都创建2016端口的Worker
      R1 u& f, ^" S3 w9 j+ }
  12.      * 当执行到worker->listen()时会报Address already in use错误1 E, A3 {( ^$ _2 X5 m! D1 l
  13.      * 如果worker->count=1则不会报错0 Q0 j5 z  Z4 F! d3 B
  14.      */" s& ?) v% L& P8 Y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    7 [; }) i7 q2 P( x( x
  16.     $inner_worker->onMessage = 'on_message';- N" `: A" ^, ?0 M. L
  17.     // 执行监听。这里会报Address already in use错误* q. O$ G# O' ?
  18.     $inner_worker->listen();
    2 V: L9 S2 E! u+ o
  19. };
    ! d& z  ]4 j: A5 a# F- S5 l

  20. , r% ]0 T; E# D7 V
  21. $worker->onMessage = 'on_message';8 U0 U- l. W1 d; |( I' C  Z, q
  22. ) X! U4 M* Y5 y3 P: m
  23. function on_message($connection, $data). H! }5 s( g/ ~2 R( H  [' Y
  24. {' }$ g/ _) X' B5 B
  25.     $connection->send("hello\n");. N/ A9 _9 v" }3 K% Y% h* O
  26. }2 t+ A, @% I: x; R$ i, ~
  27. . d) w% _. A1 X+ O4 {
  28. // 运行worker6 f) W" G2 G: L- a% h. A$ B2 _
  29. Worker::runAll();- v/ S9 m% w& G) E1 u+ Y2 h9 o
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    * J# |+ Q" k( F9 W' E

  31. " E) h$ I% r$ e/ e
  32. use Workerman\Worker;
    " m. t, H+ c1 C0 `: _# D/ w! e% I
  33. require_once './Workerman/Autoloader.php';
    3 S2 D5 o% w$ I

  34. 5 @4 I  Q$ e0 C# B
  35. $worker = new Worker('text://0.0.0.0:2015');* a' M) ]% M9 k& r1 t% M
  36. // 4个进程
    ; b3 L3 y0 `! \: ~4 Y
  37. $worker->count = 4;
    $ o4 K. p; e' s+ u: x: }
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ' n4 {2 i, N1 o" R# U# x3 G; K  o7 D
  39. $worker->onWorkerStart = function($worker)
    1 {5 M4 D( V& l4 q
  40. {) L; c, N9 M9 `  v
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');# w5 K' O9 ]# Y% q) g
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    * f5 E* R8 }' q* }8 r
  43.     $inner_worker->reusePort = true;! q! n3 Q6 h& q4 r, Z
  44.     $inner_worker->onMessage = 'on_message';9 `+ m& L! s! V+ {& g8 q1 \
  45.     // 执行监听。正常监听不会报错/ _  [$ h- x- Y
  46.     $inner_worker->listen();+ }( l4 I) n% x; i, B+ ~
  47. };9 G4 \9 {# R/ f! L

  48. ( T. |4 m, x, W' f  U9 g* j- q) k, e
  49. $worker->onMessage = 'on_message';! \# o; v3 v; v: Y1 `" h0 F: e) ?

  50. $ Z0 s! K5 s5 D2 M
  51. function on_message($connection, $data)) n# l/ y; _+ e' J6 s
  52. {
    - d5 g% R; d9 W. ^" v; ~, O  n; }& Q
  53.     $connection->send("hello\n");2 m8 r( T3 t7 H, C
  54. }: L5 N+ f1 Q/ ]. f8 c: i8 ]% y% v
  55. 4 _- o( T# v1 ?
  56. // 运行worker# x) s3 ?" l- |) p2 N! y2 k9 A
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php5 B3 S; L/ N. Y7 U
  2. use Workerman\Worker;! ~$ Y! B$ h- W/ V3 X
  3. require_once './Workerman/Autoloader.php';
    % V* t1 _( b; z, }6 Y8 A
  4. // 初始化一个worker容器,监听1234端口0 @4 [5 [; U$ c9 P% f
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ! ~3 T3 u( A8 ]$ w3 ~) N
  6. + ?" W  F2 {8 t7 f- F
  7. /*! R/ O. O9 n- m) A" ~& t6 `
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ( p& _+ |. i/ H# X  j
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    3 c7 P- B' n0 J6 |
  10. */) R9 H0 Z' m* ~' q
  11. $worker->count = 1;
    ' s3 a# j0 F. ]! R7 x5 Z0 h
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    - G% R7 _, C+ p& j/ e% B
  13. $worker->onWorkerStart = function($worker)& c7 ?. ]) W% d4 F* v
  14. {# S$ K& I2 B5 f0 g4 S! Y! y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 F# ?* y9 j! n
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    . o! K" ~9 y0 k4 [
  17.     $inner_text_worker->onMessage = function($connection, $buffer)4 J( l* B: b' O( v
  18.     {! X2 H$ r4 p( O3 M/ J" r
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据2 N& u9 q& W9 ?" _8 K- Z
  20.         $data = json_decode($buffer, true);9 |. y0 ?* D% R) m) O' A) k! O5 l
  21.         $uid = $data['uid'];
    5 [6 t9 m# f1 P3 L7 S
  22.         // 通过workerman,向uid的页面推送数据: a# }0 {& l* y2 B2 Z- w5 i
  23.         $ret = sendMessageByUid($uid, $buffer);
    ( S. p' P+ E2 R! E) Q/ @+ g$ O
  24.         // 返回推送结果
    ( C! z6 B- H- Q1 ]4 D8 _
  25.         $connection->send($ret ? 'ok' : 'fail');
    3 ^5 z; l( ]2 O8 c2 W5 Y+ |0 k
  26.     };) l* ^+ o' m; m( D; @
  27.     // ## 执行监听 ##
    ' `  c  M" t$ S' O: W& F- w& R2 K
  28.     $inner_text_worker->listen();; o- I; G( ]8 c% s5 n
  29. };
    7 R2 Y4 R2 Q2 p4 m& [( j8 N. N8 ]+ Q
  30. // 新增加一个属性,用来保存uid到connection的映射# w; Z( ]" B. N  s! d2 h
  31. $worker->uidConnections = array();* ^9 ~/ A! _4 H, ]) K* W8 K# \
  32. // 当有客户端发来消息时执行的回调函数; k- S" F: O5 J3 M- T
  33. $worker->onMessage = function($connection, $data)
    * a' ]" Y) Q6 a& F( s- Z
  34. {
    3 ]0 W* Q5 ?1 `! Y7 F9 a
  35.     global $worker;
    ! |6 N. `) c! v% K1 p+ O* p$ i% |' ~9 Q
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    & ~6 H( u% ~- s4 ~5 W  i- ~3 i
  37.     if(!isset($connection->uid))! ]+ ~5 j+ P9 |- C" F3 [
  38.     {
    7 ^2 Y: p4 i8 L+ K
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* c' k3 ?8 y% [! m# z  Q4 D! J
  40.        $connection->uid = $data;
    2 U( @$ o, d8 }+ Y. }
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection," h$ [/ e% {/ K% v5 `
  42.         * 实现针对特定uid推送数据
    6 [% f: O$ s" O% e
  43.         */7 f+ \5 b- n, I5 i- y6 W. D: d5 L
  44.        $worker->uidConnections[$connection->uid] = $connection;
    / l5 y7 D2 ^9 f' Z  f8 \7 U
  45.        return;
    7 e/ W, M! v; |/ c/ j: _: ^% O
  46.     }5 |0 }2 J5 N  g$ k4 z5 t# d: Y* |
  47. };1 y3 H; K3 m' |$ n8 e

  48. ' e( q  D# N3 @" P& a4 l$ h/ }) h
  49. // 当有客户端连接断开时
    / h5 g$ W) F. A
  50. $worker->onClose = function($connection)
    3 ?. i& o4 _9 `
  51. {
    $ j' g/ E$ a5 T2 e# }9 J
  52.     global $worker;
    $ e9 i" K* l; O) \& {  C
  53.     if(isset($connection->uid))
    # P9 q. _8 p0 t
  54.     {
    6 Q5 J/ ~5 k  V2 G
  55.         // 连接断开时删除映射
    . [$ d, x  ?9 V, F
  56.         unset($worker->uidConnections[$connection->uid]);+ T3 P5 W$ z) z  [4 ^9 R# m7 X
  57.     }
    $ f5 s: s! U& M2 E
  58. };
    2 G. o2 P- ^2 _1 Y  X

  59. . `* `9 I# }, p2 c
  60. // 向所有验证的用户推送数据2 c1 Q- |& t# t) s
  61. function broadcast($message)
    0 T. ?- P# f( m7 ]3 G! i
  62. {! ]3 [; n5 m3 o+ Q- H
  63.    global $worker;! a" m$ R: u+ b4 X
  64.    foreach($worker->uidConnections as $connection)
    . Q/ b% ?2 V/ n* o; P0 J, r
  65.    {$ l4 Z) h1 B0 J! w/ F
  66.         $connection->send($message);; F0 I* M. a& C8 n
  67.    }5 a/ _& ]0 M8 n
  68. }
    , K8 ~! i& D% E6 l
  69. 3 I9 ^7 X  R$ F9 ]
  70. // 针对uid推送数据
      o& v; X; }7 E7 k0 u
  71. function sendMessageByUid($uid, $message)
    ; P0 Q1 q2 g; l' m& s
  72. {3 }2 O( ^  b$ Z5 g1 A: |* g( H, B  q8 J
  73.     global $worker;% @) n8 g# v" t
  74.     if(isset($worker->uidConnections[$uid]))
    & T( P' w) A( N
  75.     {
    : n  ^* {. I+ Q8 K+ B  p& N: C
  76.         $connection = $worker->uidConnections[$uid];
    ! S4 H3 v- m/ j5 D! Z6 H
  77.         $connection->send($message);/ m: H2 J5 ]) F( s2 n) H* \6 U
  78.         return true;
    ; o8 P5 |8 T. p! Q7 z2 S9 n
  79.     }
    + H4 d# E2 H% ]8 r0 f9 l  F
  80.     return false;9 y3 F" S0 |1 j6 E
  81. }  z6 |! C/ }/ Y# W" e8 @
  82. 8 ?" T' t7 _3 y% ^0 Q& |$ ^
  83. // 运行所有的worker
    , B2 t7 _5 ]- d! W8 R2 l2 F
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ' B! A* J' x) M
  2. ws.onopen = function(){
    & ~$ ]7 C. {! V
  3.     var uid = 'uid1';
    9 Y" M  S& U* A& B6 i" g, @
  4.     ws.send(uid);* y9 e7 A1 }. K% ~3 r/ f. ]
  5. };# Z# y3 K4 s' {
  6. ws.onmessage = function(e){. W3 ~- ?, W/ h( C- U
  7.     alert(e.data);4 A! i6 T* d8 ]/ C, s
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口. y, g' t. U+ s2 m, |: B: e
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 M! _' b4 ?' T: \
  3. // 推送的数据,包含uid字段,表示是给这个uid推送. l3 z1 X6 c6 O1 H( P! _
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');: Q; z3 K  d9 D/ O+ V5 H  h8 N! k! G
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 I1 z+ d2 ?: y/ O2 c
  6. fwrite($client, json_encode($data)."\n");
    5 m+ ~6 S5 g; f- T
  7. // 读取推送结果3 m& u3 ~2 X9 F2 R1 b' {; G
  8. echo fread($client, 8192);
复制代码
" V+ |, L/ w1 \( m; |
$ x8 g' G  H& B$ Y
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-5-2 14:52 , Processed in 0.083141 second(s), 22 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!