您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10489|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;; `2 e/ ]# N2 |0 `# g$ g
  2. require_once __DIR__ . '/Workerman/Autoloader.php';% O5 }2 P. ], l

  3. . K4 k9 N6 c  i, l) f
  4. $worker = new Worker();2 S) {4 k3 s/ L* w% h- G! G2 ^' q
  5. // 4个进程
    9 E& Z: ]- V) l& v- e) l( j
  6. $worker->count = 4;2 [( E2 {+ \9 C; _
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ' q. z$ q2 s- F: A
  8. $worker->onWorkerStart = function($worker)0 J1 A  L; n) d, E  N
  9. {/ }0 r$ [0 `# ]9 v; w( {
  10.     /**& O: k" H: a9 r2 }
  11.      * 4个进程启动的时候都创建2016端口的Worker
    7 v- _  N9 T. _' d/ E
  12.      * 当执行到worker->listen()时会报Address already in use错误" E: x$ f" ]6 V. T
  13.      * 如果worker->count=1则不会报错1 M- m. ]$ n2 A+ d& ~6 b/ [' m
  14.      */% U( R) b' z$ A. X8 [9 @
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');9 Y6 ~& ~& O4 \& t0 K$ B( e1 g9 b
  16.     $inner_worker->onMessage = 'on_message';
    , N: {/ `3 |- P) d
  17.     // 执行监听。这里会报Address already in use错误
    . J# m4 u8 I/ D  R
  18.     $inner_worker->listen();; [8 L! i' j; {4 W
  19. };
    ; B) I; T- ?7 Z" y1 d
  20. $ [; Q1 a1 B) @* l! h% \
  21. $worker->onMessage = 'on_message';* o) ^, [& u$ ?, @# `

  22. ) j* F* ], x6 f7 z. e# ~! F
  23. function on_message($connection, $data)
      B9 c- Q; k0 n8 _  }! Z5 v
  24. {6 p/ \, M6 c# _
  25.     $connection->send("hello\n");8 h5 \) u& U4 y: o# d6 Q. k
  26. }* ]( `5 C; z- b6 k' X

  27. 1 e6 u9 e9 y9 S; e' o  W2 c- F
  28. // 运行worker
    6 M8 Z+ B2 S0 O  F( G* y6 S) y
  29. Worker::runAll();
    % u( ~2 z0 P3 f& B
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:+ _$ W! d$ Y3 R! s" \7 ~5 v/ ?& l

  31. 9 q/ l9 B3 X5 W) y; U
  32. use Workerman\Worker;
    & i$ d  F  u$ V+ ]% r) S
  33. require_once './Workerman/Autoloader.php';
    . J! Q7 Y7 l6 a) i9 Z5 k

  34. ; p& ?  ]. l5 t
  35. $worker = new Worker('text://0.0.0.0:2015');$ f/ |' q$ H5 M8 ^5 [
  36. // 4个进程9 F# V3 }" Z4 Z( E1 L% F8 C( Q& `
  37. $worker->count = 4;1 n/ g4 S2 L% w! t3 Q" i
  38. // 每个进程启动后在当前进程新增一个Worker监听
    & |/ Z0 w0 i# T6 N" u2 ^2 e
  39. $worker->onWorkerStart = function($worker)8 o5 W4 I  i! q' B" S5 q
  40. {
    1 `) G# p! n  `
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');4 e" r4 Z2 k' d1 Y% Z
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ' _5 L/ b' D7 {5 i! M
  43.     $inner_worker->reusePort = true;& c% B  T+ R' }2 p' q1 ?& c
  44.     $inner_worker->onMessage = 'on_message';/ c- Y  _: U7 J- F' l7 O/ z
  45.     // 执行监听。正常监听不会报错
    0 [  ~2 P2 P1 K7 A
  46.     $inner_worker->listen();. H- f0 W$ q% ?7 h  ?5 N' Y
  47. };
    & o: |" T: u$ W5 r* G8 A! a( r: r

  48. ) ~1 _* L0 H9 s, C3 P
  49. $worker->onMessage = 'on_message';
    / T8 p, _+ K; z
  50. * n- M! G5 F% a8 Y9 `
  51. function on_message($connection, $data)+ j2 f* F1 r1 Q" }/ G- r& r
  52. {: ]$ v1 `6 k2 j; |2 n
  53.     $connection->send("hello\n");
    9 [8 d7 {. S% I8 `7 Y1 y7 b
  54. }
    + c9 s' b+ P/ p7 U# [5 Z! q

  55. ; Z8 r3 ^) E! @! \1 Q
  56. // 运行worker5 A0 c  E4 B  F8 N
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php5 o  R0 n8 O0 P' x/ H! ]2 W
  2. use Workerman\Worker;5 x3 V, Z& A/ [: Q% z
  3. require_once './Workerman/Autoloader.php';
    , Y, }) ?2 U2 J" u" F6 N6 }
  4. // 初始化一个worker容器,监听1234端口! v. z3 e% ^3 A7 t
  5. $worker = new Worker('websocket://0.0.0.0:1234');1 |# C. F) w2 J( I5 u6 }

  6.   C: C, Q7 E5 l$ ~+ Q! k/ ?
  7. /*8 M5 d' Z% x" x  S% A' O
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ( n+ T. W4 b9 E4 y& c5 T* v  |
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    7 [) f5 y8 C- s! C1 Z
  10. */
    8 F; z; u3 S2 n* h+ ~( U- s4 M0 Q
  11. $worker->count = 1;
    1 `- H, A0 Y# f3 Z
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口2 ^( u; [8 J8 @1 a
  13. $worker->onWorkerStart = function($worker)" ]# S  k9 \  w9 I
  14. {
    - g" b$ v3 l' B: b- Z3 G
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符0 ?" f4 p- V9 t+ g
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');4 G. P- P: [5 }  C; j: N
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    & `* e8 ]- K; }* L0 e  D: d) }) S' O* c
  18.     {
    * y- a) d" h' ?: W+ s. J" E
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    . y+ S& R4 \( M
  20.         $data = json_decode($buffer, true);
    - L/ X' h3 Q5 P
  21.         $uid = $data['uid'];
    4 e: [- `) q# J2 t. G& J
  22.         // 通过workerman,向uid的页面推送数据
    4 k& U- i, v$ v2 m8 k
  23.         $ret = sendMessageByUid($uid, $buffer);- Z. ?! d* Y0 S+ w0 t+ B
  24.         // 返回推送结果
    ; ?5 k: ^+ `# l. z7 e0 S4 {
  25.         $connection->send($ret ? 'ok' : 'fail');9 a: |" H! W- R
  26.     };
    5 l6 w9 l2 R7 J( m9 O
  27.     // ## 执行监听 ##
    ' l9 s" }* |  g8 f7 Y# t& t% I
  28.     $inner_text_worker->listen();  U% m4 }, o6 c$ p6 v- g
  29. };
    , {) N5 S( ?/ C0 ?2 L$ l) ~9 l* `
  30. // 新增加一个属性,用来保存uid到connection的映射
    . i7 `" T" t7 E# ]; |, z& Y
  31. $worker->uidConnections = array();6 F( d" e0 E4 l, ?
  32. // 当有客户端发来消息时执行的回调函数
    * V8 h* j+ |4 T& U/ W
  33. $worker->onMessage = function($connection, $data)9 V" Q' G4 P( X8 S  Q
  34. {  ~$ q4 r/ W) [% b( Z
  35.     global $worker;
    ! {- r6 I% H: u" j# W) j
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    * t3 f& n0 f/ q- l& d# u, }
  37.     if(!isset($connection->uid))0 Y9 m3 |: Y$ Q7 \3 L& p$ X
  38.     {
    5 V0 a# B. Q5 b
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)) |5 z9 k9 Q4 z9 _0 C/ Q. E
  40.        $connection->uid = $data;2 u6 w3 G3 a( Y
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    4 P* g; v" u2 X1 J! i) B
  42.         * 实现针对特定uid推送数据8 F. U1 B4 ~5 e
  43.         */5 j% `4 {9 |" D3 u7 L# N. \% J
  44.        $worker->uidConnections[$connection->uid] = $connection;6 t0 H! @- o8 v, J1 X; A6 O) q
  45.        return;
    0 W* Q" s1 m2 h- L* e& r
  46.     }6 w" l: l  Z# |+ S  L
  47. };2 A. l1 |  I# D- g, y6 T8 j
  48. ( E' f2 Z) r6 [' z, m
  49. // 当有客户端连接断开时  j& {6 t5 B9 X; s1 \2 x
  50. $worker->onClose = function($connection)$ K" Y4 r+ s: g* ~4 O; x0 a
  51. {
    - L, B4 v9 n8 ?, ?3 F
  52.     global $worker;2 G# _+ O7 y7 k* h7 v3 M* {) m" l
  53.     if(isset($connection->uid))0 b5 Z3 J. p/ {- v/ q1 ]
  54.     {
    ! w' }/ S4 E9 ?4 K0 |
  55.         // 连接断开时删除映射
    4 C% w( H0 W# ~
  56.         unset($worker->uidConnections[$connection->uid]);
    6 I& {% {* P- }; h9 l( d
  57.     }
    9 x+ ]; O) I' n6 D
  58. };) @( X- l# i% I+ N$ J+ a1 I0 t8 P  b

  59. ( p$ e' s! \/ r3 y
  60. // 向所有验证的用户推送数据
    5 [6 ?: h5 t' H8 A" f/ `* [$ x+ v. D
  61. function broadcast($message)
    3 M& u% ?; V. p( w1 l! r0 l+ `. T' Y
  62. {% q* ]0 h( I) A% a! h4 `% x
  63.    global $worker;' i3 b1 ?) L1 Y, V8 b
  64.    foreach($worker->uidConnections as $connection)
    % R. `) a! {, g& D5 Y) Y
  65.    {3 o) L7 o" M6 [' s  u% q* A
  66.         $connection->send($message);; N% U. K/ ~; L( @5 u& ?7 j" u& N
  67.    }
    8 E  _. Q; i/ ]* V
  68. }
    $ P( i" H/ h3 c7 R

  69. 4 U2 c0 u* S2 k
  70. // 针对uid推送数据
    8 Y. J4 z1 u% v4 m8 O% W
  71. function sendMessageByUid($uid, $message)& T- L# `. C/ F8 [7 D" x6 j5 o5 y/ f  ^
  72. {
    5 L: R4 r. @, h! o$ Z8 e0 z( V
  73.     global $worker;
      `% f; O0 P; u
  74.     if(isset($worker->uidConnections[$uid]))7 R) H, e4 c" m0 f: Q
  75.     {' C. o( s0 [4 D4 u3 e" f+ P3 ]9 N
  76.         $connection = $worker->uidConnections[$uid];
    7 k) I( G, I/ Q9 y4 |8 ?
  77.         $connection->send($message);
    8 t( G4 a6 Z# i& P
  78.         return true;# |  U! ~9 c1 N+ z
  79.     }
    8 B2 q* I2 W. K8 Y0 T
  80.     return false;
    9 f4 c% g6 U$ l
  81. }9 M( P* ~$ A8 n% y
  82. 2 x4 K5 T" B4 `3 x. N
  83. // 运行所有的worker
      u% H# L+ x4 z6 P
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');3 P" J1 u9 k: l
  2. ws.onopen = function(){2 c- s5 e7 ]6 l& w
  3.     var uid = 'uid1';* f- w5 o, t  ?' X
  4.     ws.send(uid);
    + C! L' w1 _  m
  5. };; X9 F) `* W& |- y& u" w% `
  6. ws.onmessage = function(e){
    % `9 ~  a1 f- S" _
  7.     alert(e.data);2 }' o! Y0 w0 {% b' m5 @* K4 h, h
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    : U/ F5 H! f5 z3 t# I
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& s4 b9 M! v( [5 ~3 k6 k! X% d
  3. // 推送的数据,包含uid字段,表示是给这个uid推送7 @/ V5 C% N, [* ]# O% e
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');: [6 ^+ M- y7 k* f5 w5 M
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符* c: p8 y; |$ L+ h0 K! g
  6. fwrite($client, json_encode($data)."\n");
    5 C6 U& ^: Q2 }+ ?& y
  7. // 读取推送结果
    4 z8 d6 p0 a0 _  s
  8. echo fread($client, 8192);
复制代码

/ J$ N) Z8 Z$ C9 z: k; S  Q/ H9 r& x8 D9 n
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-17 16:05 , Processed in 0.114587 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!